e53557c4afc7110861cd23fa070945779e28e091
[platform/upstream/grpc.git] / tools / run_tests / run_xds_tests.py
1 #!/usr/bin/env python
2 # Copyright 2020 gRPC authors.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Run xDS integration tests on GCP using Traffic Director."""
16
17 import argparse
18 import googleapiclient.discovery
19 import grpc
20 import json
21 import logging
22 import os
23 import random
24 import shlex
25 import socket
26 import subprocess
27 import sys
28 import tempfile
29 import time
30 import uuid
31
32 from oauth2client.client import GoogleCredentials
33
34 import python_utils.jobset as jobset
35 import python_utils.report_utils as report_utils
36
37 from src.proto.grpc.health.v1 import health_pb2
38 from src.proto.grpc.health.v1 import health_pb2_grpc
39 from src.proto.grpc.testing import empty_pb2
40 from src.proto.grpc.testing import messages_pb2
41 from src.proto.grpc.testing import test_pb2_grpc
42
43 logger = logging.getLogger()
44 console_handler = logging.StreamHandler()
45 formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
46 console_handler.setFormatter(formatter)
47 logger.handlers = []
48 logger.addHandler(console_handler)
49 logger.setLevel(logging.WARNING)
50
51 _TEST_CASES = [
52     'backends_restart',
53     'change_backend_service',
54     'gentle_failover',
55     'load_report_based_failover',
56     'ping_pong',
57     'remove_instance_group',
58     'round_robin',
59     'secondary_locality_gets_no_requests_on_partial_primary_failure',
60     'secondary_locality_gets_requests_on_primary_failure',
61     'traffic_splitting',
62 ]
63 # Valid test cases, but not in all. So the tests can only run manually, and
64 # aren't enabled automatically for all languages.
65 #
66 # TODO: Move them into _TEST_CASES when support is ready in all languages.
67 _ADDITIONAL_TEST_CASES = [
68     'path_matching',
69     'header_matching',
70     'circuit_breaking',
71     'timeout',
72     'fault_injection',
73 ]
74
75 # Test cases that require the V3 API.  Skipped in older runs.
76 _V3_TEST_CASES = frozenset(['timeout', 'fault_injection'])
77
78 # Test cases that require the alpha API.  Skipped for stable API runs.
79 _ALPHA_TEST_CASES = frozenset(['timeout'])
80
81
82 def parse_test_cases(arg):
83     if arg == '':
84         return []
85     arg_split = arg.split(',')
86     test_cases = set()
87     all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
88     for arg in arg_split:
89         if arg == "all":
90             test_cases = test_cases.union(_TEST_CASES)
91         else:
92             test_cases = test_cases.union([arg])
93     if not all([test_case in all_test_cases for test_case in test_cases]):
94         raise Exception('Failed to parse test cases %s' % arg)
95     # Perserve order.
96     return [x for x in all_test_cases if x in test_cases]
97
98
99 def parse_port_range(port_arg):
100     try:
101         port = int(port_arg)
102         return range(port, port + 1)
103     except:
104         port_min, port_max = port_arg.split(':')
105         return range(int(port_min), int(port_max) + 1)
106
107
108 argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
109 # TODO(zdapeng): remove default value of project_id and project_num
110 argp.add_argument('--project_id', default='grpc-testing', help='GCP project id')
111 argp.add_argument('--project_num',
112                   default='830293263384',
113                   help='GCP project number')
114 argp.add_argument(
115     '--gcp_suffix',
116     default='',
117     help='Optional suffix for all generated GCP resource names. Useful to '
118     'ensure distinct names across test runs.')
119 argp.add_argument(
120     '--test_case',
121     default='ping_pong',
122     type=parse_test_cases,
123     help='Comma-separated list of test cases to run. Available tests: %s, '
124     '(or \'all\' to run every test). '
125     'Alternative tests not included in \'all\': %s' %
126     (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
127 argp.add_argument(
128     '--bootstrap_file',
129     default='',
130     help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
131     'bootstrap generation')
132 argp.add_argument(
133     '--xds_v3_support',
134     default=False,
135     action='store_true',
136     help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. '
137     'If a pre-created bootstrap file is provided via the --bootstrap_file '
138     'parameter, it should include xds_v3 in its server_features field.')
139 argp.add_argument(
140     '--client_cmd',
141     default=None,
142     help='Command to launch xDS test client. {server_uri}, {stats_port} and '
143     '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
144     'will be set for the command')
145 argp.add_argument(
146     '--client_hosts',
147     default=None,
148     help='Comma-separated list of hosts running client processes. If set, '
149     '--client_cmd is ignored and client processes are assumed to be running on '
150     'the specified hosts.')
151 argp.add_argument('--zone', default='us-central1-a')
152 argp.add_argument('--secondary_zone',
153                   default='us-west1-b',
154                   help='Zone to use for secondary TD locality tests')
155 argp.add_argument('--qps', default=100, type=int, help='Client QPS')
156 argp.add_argument(
157     '--wait_for_backend_sec',
158     default=1200,
159     type=int,
160     help='Time limit for waiting for created backend services to report '
161     'healthy when launching or updated GCP resources')
162 argp.add_argument(
163     '--use_existing_gcp_resources',
164     default=False,
165     action='store_true',
166     help=
167     'If set, find and use already created GCP resources instead of creating new'
168     ' ones.')
169 argp.add_argument(
170     '--keep_gcp_resources',
171     default=False,
172     action='store_true',
173     help=
174     'Leave GCP VMs and configuration running after test. Default behavior is '
175     'to delete when tests complete.')
176 argp.add_argument(
177     '--compute_discovery_document',
178     default=None,
179     type=str,
180     help=
181     'If provided, uses this file instead of retrieving via the GCP discovery '
182     'API')
183 argp.add_argument(
184     '--alpha_compute_discovery_document',
185     default=None,
186     type=str,
187     help='If provided, uses this file instead of retrieving via the alpha GCP '
188     'discovery API')
189 argp.add_argument('--network',
190                   default='global/networks/default',
191                   help='GCP network to use')
192 argp.add_argument('--service_port_range',
193                   default='8080:8110',
194                   type=parse_port_range,
195                   help='Listening port for created gRPC backends. Specified as '
196                   'either a single int or as a range in the format min:max, in '
197                   'which case an available port p will be chosen s.t. min <= p '
198                   '<= max')
199 argp.add_argument(
200     '--stats_port',
201     default=8079,
202     type=int,
203     help='Local port for the client process to expose the LB stats service')
204 argp.add_argument('--xds_server',
205                   default='trafficdirector.googleapis.com:443',
206                   help='xDS server')
207 argp.add_argument('--source_image',
208                   default='projects/debian-cloud/global/images/family/debian-9',
209                   help='Source image for VMs created during the test')
210 argp.add_argument('--path_to_server_binary',
211                   default=None,
212                   type=str,
213                   help='If set, the server binary must already be pre-built on '
214                   'the specified source image')
215 argp.add_argument('--machine_type',
216                   default='e2-standard-2',
217                   help='Machine type for VMs created during the test')
218 argp.add_argument(
219     '--instance_group_size',
220     default=2,
221     type=int,
222     help='Number of VMs to create per instance group. Certain test cases (e.g., '
223     'round_robin) may not give meaningful results if this is set to a value '
224     'less than 2.')
225 argp.add_argument('--verbose',
226                   help='verbose log output',
227                   default=False,
228                   action='store_true')
229 # TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
230 # visible in all test environments.
231 argp.add_argument('--log_client_output',
232                   help='Log captured client output',
233                   default=False,
234                   action='store_true')
235 # TODO(ericgribkoff) Remove this flag once all test environments are verified to
236 # have access to the alpha compute APIs.
237 argp.add_argument('--only_stable_gcp_apis',
238                   help='Do not use alpha compute APIs. Some tests may be '
239                   'incompatible with this option (gRPC health checks are '
240                   'currently alpha and required for simulating server failure',
241                   default=False,
242                   action='store_true')
243 args = argp.parse_args()
244
245 if args.verbose:
246     logger.setLevel(logging.DEBUG)
247
248 CLIENT_HOSTS = []
249 if args.client_hosts:
250     CLIENT_HOSTS = args.client_hosts.split(',')
251
252 _DEFAULT_SERVICE_PORT = 80
253 _WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
254 _WAIT_FOR_OPERATION_SEC = 1200
255 _INSTANCE_GROUP_SIZE = args.instance_group_size
256 _NUM_TEST_RPCS = 10 * args.qps
257 _WAIT_FOR_STATS_SEC = 360
258 _WAIT_FOR_VALID_CONFIG_SEC = 60
259 _WAIT_FOR_URL_MAP_PATCH_SEC = 300
260 _CONNECTION_TIMEOUT_SEC = 60
261 _GCP_API_RETRIES = 5
262 _BOOTSTRAP_TEMPLATE = """
263 {{
264   "node": {{
265     "id": "{node_id}",
266     "metadata": {{
267       "TRAFFICDIRECTOR_NETWORK_NAME": "%s"
268     }},
269     "locality": {{
270       "zone": "%s"
271     }}
272   }},
273   "xds_servers": [{{
274     "server_uri": "%s",
275     "channel_creds": [
276       {{
277         "type": "google_default",
278         "config": {{}}
279       }}
280     ],
281     "server_features": {server_features}
282   }}]
283 }}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
284
285 # TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
286 # sends an update with no localities when adding the MIG to the backend service
287 # can race with the URL map patch.
288 _TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin']
289 # Tests that run UnaryCall and EmptyCall.
290 _TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
291 # Tests that make UnaryCall with test metadata.
292 _TESTS_TO_SEND_METADATA = ['header_matching']
293 _TEST_METADATA_KEY = 'xds_md'
294 _TEST_METADATA_VALUE_UNARY = 'unary_yranu'
295 _TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
296 # Extra RPC metadata whose value is a number, sent with UnaryCall only.
297 _TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
298 _TEST_METADATA_NUMERIC_VALUE = '159'
299 _PATH_MATCHER_NAME = 'path-matcher'
300 _BASE_TEMPLATE_NAME = 'test-template'
301 _BASE_INSTANCE_GROUP_NAME = 'test-ig'
302 _BASE_HEALTH_CHECK_NAME = 'test-hc'
303 _BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
304 _BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
305 _BASE_URL_MAP_NAME = 'test-map'
306 _BASE_SERVICE_HOST = 'grpc-test'
307 _BASE_TARGET_PROXY_NAME = 'test-target-proxy'
308 _BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
309 _TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
310                                   '../../reports')
311 _SPONGE_LOG_NAME = 'sponge_log.log'
312 _SPONGE_XML_NAME = 'sponge_log.xml'
313
314
315 def get_client_stats(num_rpcs, timeout_sec):
316     if CLIENT_HOSTS:
317         hosts = CLIENT_HOSTS
318     else:
319         hosts = ['localhost']
320     for host in hosts:
321         with grpc.insecure_channel('%s:%d' %
322                                    (host, args.stats_port)) as channel:
323             stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
324             request = messages_pb2.LoadBalancerStatsRequest()
325             request.num_rpcs = num_rpcs
326             request.timeout_sec = timeout_sec
327             rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
328             logger.debug('Invoking GetClientStats RPC to %s:%d:', host,
329                          args.stats_port)
330             response = stub.GetClientStats(request,
331                                            wait_for_ready=True,
332                                            timeout=rpc_timeout)
333             logger.debug('Invoked GetClientStats RPC to %s: %s', host, response)
334             return response
335
336
337 def get_client_accumulated_stats():
338     if CLIENT_HOSTS:
339         hosts = CLIENT_HOSTS
340     else:
341         hosts = ['localhost']
342     for host in hosts:
343         with grpc.insecure_channel('%s:%d' %
344                                    (host, args.stats_port)) as channel:
345             stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
346             request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
347             logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:',
348                          host, args.stats_port)
349             response = stub.GetClientAccumulatedStats(
350                 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC)
351             logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s',
352                          host, response)
353             return response
354
355
356 def configure_client(rpc_types, metadata=[], timeout_sec=None):
357     if CLIENT_HOSTS:
358         hosts = CLIENT_HOSTS
359     else:
360         hosts = ['localhost']
361     for host in hosts:
362         with grpc.insecure_channel('%s:%d' %
363                                    (host, args.stats_port)) as channel:
364             stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
365             request = messages_pb2.ClientConfigureRequest()
366             request.types.extend(rpc_types)
367             for rpc_type, md_key, md_value in metadata:
368                 md = request.metadata.add()
369                 md.type = rpc_type
370                 md.key = md_key
371                 md.value = md_value
372             if timeout_sec:
373                 request.timeout_sec = timeout_sec
374             logger.debug(
375                 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
376                 host, args.stats_port, request)
377             stub.Configure(request,
378                            wait_for_ready=True,
379                            timeout=_CONNECTION_TIMEOUT_SEC)
380             logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s',
381                          host)
382
383
384 class RpcDistributionError(Exception):
385     pass
386
387
388 def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
389                                    allow_failures):
390     start_time = time.time()
391     error_msg = None
392     logger.debug('Waiting for %d sec until backends %s receive load' %
393                  (timeout_sec, backends))
394     while time.time() - start_time <= timeout_sec:
395         error_msg = None
396         stats = get_client_stats(num_rpcs, timeout_sec)
397         rpcs_by_peer = stats.rpcs_by_peer
398         for backend in backends:
399             if backend not in rpcs_by_peer:
400                 error_msg = 'Backend %s did not receive load' % backend
401                 break
402         if not error_msg and len(rpcs_by_peer) > len(backends):
403             error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
404         if not allow_failures and stats.num_failures > 0:
405             error_msg = '%d RPCs failed' % stats.num_failures
406         if not error_msg:
407             return
408     raise RpcDistributionError(error_msg)
409
410
411 def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
412                                                      timeout_sec,
413                                                      num_rpcs=_NUM_TEST_RPCS):
414     _verify_rpcs_to_given_backends(backends,
415                                    timeout_sec,
416                                    num_rpcs,
417                                    allow_failures=True)
418
419
420 def wait_until_all_rpcs_go_to_given_backends(backends,
421                                              timeout_sec,
422                                              num_rpcs=_NUM_TEST_RPCS):
423     _verify_rpcs_to_given_backends(backends,
424                                    timeout_sec,
425                                    num_rpcs,
426                                    allow_failures=False)
427
428
429 def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
430     '''Block until the test client reaches the state with the given number
431     of RPCs being outstanding stably.
432
433     Args:
434       rpc_type: A string indicating the RPC method to check for. Either
435         'UnaryCall' or 'EmptyCall'.
436       timeout_sec: Maximum number of seconds to wait until the desired state
437         is reached.
438       num_rpcs: Expected number of RPCs to be in-flight.
439       threshold: Number within [0,100], the tolerable percentage by which
440         the actual number of RPCs in-flight can differ from the expected number.
441     '''
442     if threshold < 0 or threshold > 100:
443         raise ValueError('Value error: Threshold should be between 0 to 100')
444     threshold_fraction = threshold / 100.0
445     start_time = time.time()
446     error_msg = None
447     logger.debug(
448         'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' %
449         (timeout_sec, num_rpcs, rpc_type, threshold))
450     while time.time() - start_time <= timeout_sec:
451         error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
452                                           threshold_fraction)
453         if error_msg:
454             logger.debug('Progress: %s', error_msg)
455             time.sleep(2)
456         else:
457             break
458     # Ensure the number of outstanding RPCs is stable.
459     if not error_msg:
460         time.sleep(5)
461         error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
462                                           threshold_fraction)
463     if error_msg:
464         raise Exception("Wrong number of %s RPCs in-flight: %s" %
465                         (rpc_type, error_msg))
466
467
468 def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
469     error_msg = None
470     stats = get_client_accumulated_stats()
471     rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
472     rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
473     rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
474     rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
475     if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
476         error_msg = ('actual(%d) < expected(%d - %d%%)' %
477                      (rpcs_in_flight, num_rpcs, threshold))
478     elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
479         error_msg = ('actual(%d) > expected(%d + %d%%)' %
480                      (rpcs_in_flight, num_rpcs, threshold))
481     return error_msg
482
483
484 def compare_distributions(actual_distribution, expected_distribution,
485                           threshold):
486     """Compare if two distributions are similar.
487
488     Args:
489       actual_distribution: A list of floats, contains the actual distribution.
490       expected_distribution: A list of floats, contains the expected distribution.
491       threshold: Number within [0,100], the threshold percentage by which the
492         actual distribution can differ from the expected distribution.
493
494     Returns:
495       The similarity between the distributions as a boolean. Returns true if the
496       actual distribution lies within the threshold of the expected
497       distribution, false otherwise.
498     
499     Raises:
500       ValueError: if threshold is not with in [0,100].
501       Exception: containing detailed error messages.
502     """
503     if len(expected_distribution) != len(actual_distribution):
504         raise Exception(
505             'Error: expected and actual distributions have different size (%d vs %d)'
506             % (len(expected_distribution), len(actual_distribution)))
507     if threshold < 0 or threshold > 100:
508         raise ValueError('Value error: Threshold should be between 0 to 100')
509     threshold_fraction = threshold / 100.0
510     for expected, actual in zip(expected_distribution, actual_distribution):
511         if actual < (expected * (1 - threshold_fraction)):
512             raise Exception("actual(%f) < expected(%f-%d%%)" %
513                             (actual, expected, threshold))
514         if actual > (expected * (1 + threshold_fraction)):
515             raise Exception("actual(%f) > expected(%f+%d%%)" %
516                             (actual, expected, threshold))
517     return True
518
519
520 def compare_expected_instances(stats, expected_instances):
521     """Compare if stats have expected instances for each type of RPC.
522
523     Args:
524       stats: LoadBalancerStatsResponse reported by interop client.
525       expected_instances: a dict with key as the RPC type (string), value as
526         the expected backend instances (list of strings).
527
528     Returns:
529       Returns true if the instances are expected. False if not.
530     """
531     for rpc_type, expected_peers in expected_instances.items():
532         rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
533         rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
534         logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
535         peers = list(rpcs_by_peer.keys())
536         if set(peers) != set(expected_peers):
537             logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
538                         peers, expected_peers)
539             return False
540     return True
541
542
543 def test_backends_restart(gcp, backend_service, instance_group):
544     logger.info('Running test_backends_restart')
545     instance_names = get_instance_names(gcp, instance_group)
546     num_instances = len(instance_names)
547     start_time = time.time()
548     wait_until_all_rpcs_go_to_given_backends(instance_names,
549                                              _WAIT_FOR_STATS_SEC)
550     try:
551         resize_instance_group(gcp, instance_group, 0)
552         wait_until_all_rpcs_go_to_given_backends_or_fail([],
553                                                          _WAIT_FOR_BACKEND_SEC)
554     finally:
555         resize_instance_group(gcp, instance_group, num_instances)
556     wait_for_healthy_backends(gcp, backend_service, instance_group)
557     new_instance_names = get_instance_names(gcp, instance_group)
558     wait_until_all_rpcs_go_to_given_backends(new_instance_names,
559                                              _WAIT_FOR_BACKEND_SEC)
560
561
562 def test_change_backend_service(gcp, original_backend_service, instance_group,
563                                 alternate_backend_service,
564                                 same_zone_instance_group):
565     logger.info('Running test_change_backend_service')
566     original_backend_instances = get_instance_names(gcp, instance_group)
567     alternate_backend_instances = get_instance_names(gcp,
568                                                      same_zone_instance_group)
569     patch_backend_service(gcp, alternate_backend_service,
570                           [same_zone_instance_group])
571     wait_for_healthy_backends(gcp, original_backend_service, instance_group)
572     wait_for_healthy_backends(gcp, alternate_backend_service,
573                               same_zone_instance_group)
574     wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
575                                              _WAIT_FOR_STATS_SEC)
576     try:
577         patch_url_map_backend_service(gcp, alternate_backend_service)
578         wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
579                                                  _WAIT_FOR_URL_MAP_PATCH_SEC)
580     finally:
581         patch_url_map_backend_service(gcp, original_backend_service)
582         patch_backend_service(gcp, alternate_backend_service, [])
583
584
585 def test_gentle_failover(gcp,
586                          backend_service,
587                          primary_instance_group,
588                          secondary_instance_group,
589                          swapped_primary_and_secondary=False):
590     logger.info('Running test_gentle_failover')
591     num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
592     min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
593     try:
594         if num_primary_instances < min_instances_for_gentle_failover:
595             resize_instance_group(gcp, primary_instance_group,
596                                   min_instances_for_gentle_failover)
597         patch_backend_service(
598             gcp, backend_service,
599             [primary_instance_group, secondary_instance_group])
600         primary_instance_names = get_instance_names(gcp, primary_instance_group)
601         secondary_instance_names = get_instance_names(gcp,
602                                                       secondary_instance_group)
603         wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
604         wait_for_healthy_backends(gcp, backend_service,
605                                   secondary_instance_group)
606         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
607                                                  _WAIT_FOR_STATS_SEC)
608         instances_to_stop = primary_instance_names[:-1]
609         remaining_instances = primary_instance_names[-1:]
610         try:
611             set_serving_status(instances_to_stop,
612                                gcp.service_port,
613                                serving=False)
614             wait_until_all_rpcs_go_to_given_backends(
615                 remaining_instances + secondary_instance_names,
616                 _WAIT_FOR_BACKEND_SEC)
617         finally:
618             set_serving_status(primary_instance_names,
619                                gcp.service_port,
620                                serving=True)
621     except RpcDistributionError as e:
622         if not swapped_primary_and_secondary and is_primary_instance_group(
623                 gcp, secondary_instance_group):
624             # Swap expectation of primary and secondary instance groups.
625             test_gentle_failover(gcp,
626                                  backend_service,
627                                  secondary_instance_group,
628                                  primary_instance_group,
629                                  swapped_primary_and_secondary=True)
630         else:
631             raise e
632     finally:
633         patch_backend_service(gcp, backend_service, [primary_instance_group])
634         resize_instance_group(gcp, primary_instance_group,
635                               num_primary_instances)
636         instance_names = get_instance_names(gcp, primary_instance_group)
637         wait_until_all_rpcs_go_to_given_backends(instance_names,
638                                                  _WAIT_FOR_BACKEND_SEC)
639
640
641 def test_load_report_based_failover(gcp, backend_service,
642                                     primary_instance_group,
643                                     secondary_instance_group):
644     logger.info('Running test_load_report_based_failover')
645     try:
646         patch_backend_service(
647             gcp, backend_service,
648             [primary_instance_group, secondary_instance_group])
649         primary_instance_names = get_instance_names(gcp, primary_instance_group)
650         secondary_instance_names = get_instance_names(gcp,
651                                                       secondary_instance_group)
652         wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
653         wait_for_healthy_backends(gcp, backend_service,
654                                   secondary_instance_group)
655         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
656                                                  _WAIT_FOR_STATS_SEC)
657         # Set primary locality's balance mode to RATE, and RPS to 20% of the
658         # client's QPS. The secondary locality will be used.
659         max_rate = int(args.qps * 1 / 5)
660         logger.info('Patching backend service to RATE with %d max_rate',
661                     max_rate)
662         patch_backend_service(
663             gcp,
664             backend_service, [primary_instance_group, secondary_instance_group],
665             balancing_mode='RATE',
666             max_rate=max_rate)
667         wait_until_all_rpcs_go_to_given_backends(
668             primary_instance_names + secondary_instance_names,
669             _WAIT_FOR_BACKEND_SEC)
670
671         # Set primary locality's balance mode to RATE, and RPS to 120% of the
672         # client's QPS. Only the primary locality will be used.
673         max_rate = int(args.qps * 6 / 5)
674         logger.info('Patching backend service to RATE with %d max_rate',
675                     max_rate)
676         patch_backend_service(
677             gcp,
678             backend_service, [primary_instance_group, secondary_instance_group],
679             balancing_mode='RATE',
680             max_rate=max_rate)
681         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
682                                                  _WAIT_FOR_BACKEND_SEC)
683         logger.info("success")
684     finally:
685         patch_backend_service(gcp, backend_service, [primary_instance_group])
686         instance_names = get_instance_names(gcp, primary_instance_group)
687         wait_until_all_rpcs_go_to_given_backends(instance_names,
688                                                  _WAIT_FOR_BACKEND_SEC)
689
690
691 def test_ping_pong(gcp, backend_service, instance_group):
692     logger.info('Running test_ping_pong')
693     wait_for_healthy_backends(gcp, backend_service, instance_group)
694     instance_names = get_instance_names(gcp, instance_group)
695     wait_until_all_rpcs_go_to_given_backends(instance_names,
696                                              _WAIT_FOR_STATS_SEC)
697
698
699 def test_remove_instance_group(gcp, backend_service, instance_group,
700                                same_zone_instance_group):
701     logger.info('Running test_remove_instance_group')
702     try:
703         patch_backend_service(gcp,
704                               backend_service,
705                               [instance_group, same_zone_instance_group],
706                               balancing_mode='RATE')
707         wait_for_healthy_backends(gcp, backend_service, instance_group)
708         wait_for_healthy_backends(gcp, backend_service,
709                                   same_zone_instance_group)
710         instance_names = get_instance_names(gcp, instance_group)
711         same_zone_instance_names = get_instance_names(gcp,
712                                                       same_zone_instance_group)
713         try:
714             wait_until_all_rpcs_go_to_given_backends(
715                 instance_names + same_zone_instance_names,
716                 _WAIT_FOR_OPERATION_SEC)
717             remaining_instance_group = same_zone_instance_group
718             remaining_instance_names = same_zone_instance_names
719         except RpcDistributionError as e:
720             # If connected to TD in a different zone, we may route traffic to
721             # only one instance group. Determine which group that is to continue
722             # with the remainder of the test case.
723             try:
724                 wait_until_all_rpcs_go_to_given_backends(
725                     instance_names, _WAIT_FOR_STATS_SEC)
726                 remaining_instance_group = same_zone_instance_group
727                 remaining_instance_names = same_zone_instance_names
728             except RpcDistributionError as e:
729                 wait_until_all_rpcs_go_to_given_backends(
730                     same_zone_instance_names, _WAIT_FOR_STATS_SEC)
731                 remaining_instance_group = instance_group
732                 remaining_instance_names = instance_names
733         patch_backend_service(gcp,
734                               backend_service, [remaining_instance_group],
735                               balancing_mode='RATE')
736         wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
737                                                  _WAIT_FOR_BACKEND_SEC)
738     finally:
739         patch_backend_service(gcp, backend_service, [instance_group])
740         wait_until_all_rpcs_go_to_given_backends(instance_names,
741                                                  _WAIT_FOR_BACKEND_SEC)
742
743
744 def test_round_robin(gcp, backend_service, instance_group):
745     logger.info('Running test_round_robin')
746     wait_for_healthy_backends(gcp, backend_service, instance_group)
747     instance_names = get_instance_names(gcp, instance_group)
748     threshold = 1
749     wait_until_all_rpcs_go_to_given_backends(instance_names,
750                                              _WAIT_FOR_STATS_SEC)
751     # TODO(ericgribkoff) Delayed config propagation from earlier tests
752     # may result in briefly receiving an empty EDS update, resulting in failed
753     # RPCs. Retry distribution validation if this occurs; long-term fix is
754     # creating new backend resources for each individual test case.
755     # Each attempt takes 10 seconds. Config propagation can take several
756     # minutes.
757     max_attempts = 40
758     for i in range(max_attempts):
759         stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
760         requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
761         total_requests_received = sum(requests_received)
762         if total_requests_received != _NUM_TEST_RPCS:
763             logger.info('Unexpected RPC failures, retrying: %s', stats)
764             continue
765         expected_requests = total_requests_received / len(instance_names)
766         for instance in instance_names:
767             if abs(stats.rpcs_by_peer[instance] -
768                    expected_requests) > threshold:
769                 raise Exception(
770                     'RPC peer distribution differs from expected by more than %d '
771                     'for instance %s (%s)' % (threshold, instance, stats))
772         return
773     raise Exception('RPC failures persisted through %d retries' % max_attempts)
774
775
776 def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
777         gcp,
778         backend_service,
779         primary_instance_group,
780         secondary_instance_group,
781         swapped_primary_and_secondary=False):
782     logger.info(
783         'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
784     )
785     try:
786         patch_backend_service(
787             gcp, backend_service,
788             [primary_instance_group, secondary_instance_group])
789         wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
790         wait_for_healthy_backends(gcp, backend_service,
791                                   secondary_instance_group)
792         primary_instance_names = get_instance_names(gcp, primary_instance_group)
793         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
794                                                  _WAIT_FOR_STATS_SEC)
795         instances_to_stop = primary_instance_names[:1]
796         remaining_instances = primary_instance_names[1:]
797         try:
798             set_serving_status(instances_to_stop,
799                                gcp.service_port,
800                                serving=False)
801             wait_until_all_rpcs_go_to_given_backends(remaining_instances,
802                                                      _WAIT_FOR_BACKEND_SEC)
803         finally:
804             set_serving_status(primary_instance_names,
805                                gcp.service_port,
806                                serving=True)
807     except RpcDistributionError as e:
808         if not swapped_primary_and_secondary and is_primary_instance_group(
809                 gcp, secondary_instance_group):
810             # Swap expectation of primary and secondary instance groups.
811             test_secondary_locality_gets_no_requests_on_partial_primary_failure(
812                 gcp,
813                 backend_service,
814                 secondary_instance_group,
815                 primary_instance_group,
816                 swapped_primary_and_secondary=True)
817         else:
818             raise e
819     finally:
820         patch_backend_service(gcp, backend_service, [primary_instance_group])
821
822
823 def test_secondary_locality_gets_requests_on_primary_failure(
824         gcp,
825         backend_service,
826         primary_instance_group,
827         secondary_instance_group,
828         swapped_primary_and_secondary=False):
829     logger.info('Running secondary_locality_gets_requests_on_primary_failure')
830     try:
831         patch_backend_service(
832             gcp, backend_service,
833             [primary_instance_group, secondary_instance_group])
834         wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
835         wait_for_healthy_backends(gcp, backend_service,
836                                   secondary_instance_group)
837         primary_instance_names = get_instance_names(gcp, primary_instance_group)
838         secondary_instance_names = get_instance_names(gcp,
839                                                       secondary_instance_group)
840         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
841                                                  _WAIT_FOR_STATS_SEC)
842         try:
843             set_serving_status(primary_instance_names,
844                                gcp.service_port,
845                                serving=False)
846             wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
847                                                      _WAIT_FOR_BACKEND_SEC)
848         finally:
849             set_serving_status(primary_instance_names,
850                                gcp.service_port,
851                                serving=True)
852     except RpcDistributionError as e:
853         if not swapped_primary_and_secondary and is_primary_instance_group(
854                 gcp, secondary_instance_group):
855             # Swap expectation of primary and secondary instance groups.
856             test_secondary_locality_gets_requests_on_primary_failure(
857                 gcp,
858                 backend_service,
859                 secondary_instance_group,
860                 primary_instance_group,
861                 swapped_primary_and_secondary=True)
862         else:
863             raise e
864     finally:
865         patch_backend_service(gcp, backend_service, [primary_instance_group])
866
867
868 def prepare_services_for_urlmap_tests(gcp, original_backend_service,
869                                       instance_group, alternate_backend_service,
870                                       same_zone_instance_group):
871     '''
872     This function prepares the services to be ready for tests that modifies
873     urlmaps.
874
875     Returns:
876       Returns original and alternate backend names as lists of strings.
877     '''
878     logger.info('waiting for original backends to become healthy')
879     wait_for_healthy_backends(gcp, original_backend_service, instance_group)
880
881     patch_backend_service(gcp, alternate_backend_service,
882                           [same_zone_instance_group])
883     logger.info('waiting for alternate to become healthy')
884     wait_for_healthy_backends(gcp, alternate_backend_service,
885                               same_zone_instance_group)
886
887     original_backend_instances = get_instance_names(gcp, instance_group)
888     logger.info('original backends instances: %s', original_backend_instances)
889
890     alternate_backend_instances = get_instance_names(gcp,
891                                                      same_zone_instance_group)
892     logger.info('alternate backends instances: %s', alternate_backend_instances)
893
894     # Start with all traffic going to original_backend_service.
895     logger.info('waiting for traffic to all go to original backends')
896     wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
897                                              _WAIT_FOR_STATS_SEC)
898     return original_backend_instances, alternate_backend_instances
899
900
901 def test_traffic_splitting(gcp, original_backend_service, instance_group,
902                            alternate_backend_service, same_zone_instance_group):
903     # This test start with all traffic going to original_backend_service. Then
904     # it updates URL-map to set default action to traffic splitting between
905     # original and alternate. It waits for all backends in both services to
906     # receive traffic, then verifies that weights are expected.
907     logger.info('Running test_traffic_splitting')
908
909     original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
910         gcp, original_backend_service, instance_group,
911         alternate_backend_service, same_zone_instance_group)
912
913     try:
914         # Patch urlmap, change route action to traffic splitting between
915         # original and alternate.
916         logger.info('patching url map with traffic splitting')
917         original_service_percentage, alternate_service_percentage = 20, 80
918         patch_url_map_backend_service(
919             gcp,
920             services_with_weights={
921                 original_backend_service: original_service_percentage,
922                 alternate_backend_service: alternate_service_percentage,
923             })
924         # Split percentage between instances: [20,80] -> [10,10,40,40].
925         expected_instance_percentage = [
926             original_service_percentage * 1.0 / len(original_backend_instances)
927         ] * len(original_backend_instances) + [
928             alternate_service_percentage * 1.0 /
929             len(alternate_backend_instances)
930         ] * len(alternate_backend_instances)
931
932         # Wait for traffic to go to both services.
933         logger.info(
934             'waiting for traffic to go to all backends (including alternate)')
935         wait_until_all_rpcs_go_to_given_backends(
936             original_backend_instances + alternate_backend_instances,
937             _WAIT_FOR_STATS_SEC)
938
939         # Verify that weights between two services are expected.
940         retry_count = 10
941         # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
942         # seconds timeout.
943         for i in range(retry_count):
944             stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
945             got_instance_count = [
946                 stats.rpcs_by_peer[i] for i in original_backend_instances
947             ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
948             total_count = sum(got_instance_count)
949             got_instance_percentage = [
950                 x * 100.0 / total_count for x in got_instance_count
951             ]
952
953             try:
954                 compare_distributions(got_instance_percentage,
955                                       expected_instance_percentage, 5)
956             except Exception as e:
957                 logger.info('attempt %d', i)
958                 logger.info('got percentage: %s', got_instance_percentage)
959                 logger.info('expected percentage: %s',
960                             expected_instance_percentage)
961                 logger.info(e)
962                 if i == retry_count - 1:
963                     raise Exception(
964                         'RPC distribution (%s) differs from expected (%s)' %
965                         (got_instance_percentage, expected_instance_percentage))
966             else:
967                 logger.info("success")
968                 break
969     finally:
970         patch_url_map_backend_service(gcp, original_backend_service)
971         patch_backend_service(gcp, alternate_backend_service, [])
972
973
974 def test_path_matching(gcp, original_backend_service, instance_group,
975                        alternate_backend_service, same_zone_instance_group):
976     # This test start with all traffic (UnaryCall and EmptyCall) going to
977     # original_backend_service.
978     #
979     # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
980     # go different backends. It waits for all backends in both services to
981     # receive traffic, then verifies that traffic goes to the expected
982     # backends.
983     logger.info('Running test_path_matching')
984
985     original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
986         gcp, original_backend_service, instance_group,
987         alternate_backend_service, same_zone_instance_group)
988
989     try:
990         # A list of tuples (route_rules, expected_instances).
991         test_cases = [
992             (
993                 [{
994                     'priority': 0,
995                     # FullPath EmptyCall -> alternate_backend_service.
996                     'matchRules': [{
997                         'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
998                     }],
999                     'service': alternate_backend_service.url
1000                 }],
1001                 {
1002                     "EmptyCall": alternate_backend_instances,
1003                     "UnaryCall": original_backend_instances
1004                 }),
1005             (
1006                 [{
1007                     'priority': 0,
1008                     # Prefix UnaryCall -> alternate_backend_service.
1009                     'matchRules': [{
1010                         'prefixMatch': '/grpc.testing.TestService/Unary'
1011                     }],
1012                     'service': alternate_backend_service.url
1013                 }],
1014                 {
1015                     "UnaryCall": alternate_backend_instances,
1016                     "EmptyCall": original_backend_instances
1017                 }),
1018             (
1019                 # This test case is similar to the one above (but with route
1020                 # services swapped). This test has two routes (full_path and
1021                 # the default) to match EmptyCall, and both routes set
1022                 # alternative_backend_service as the action. This forces the
1023                 # client to handle duplicate Clusters in the RDS response.
1024                 [
1025                     {
1026                         'priority': 0,
1027                         # Prefix UnaryCall -> original_backend_service.
1028                         'matchRules': [{
1029                             'prefixMatch': '/grpc.testing.TestService/Unary'
1030                         }],
1031                         'service': original_backend_service.url
1032                     },
1033                     {
1034                         'priority': 1,
1035                         # FullPath EmptyCall -> alternate_backend_service.
1036                         'matchRules': [{
1037                             'fullPathMatch':
1038                                 '/grpc.testing.TestService/EmptyCall'
1039                         }],
1040                         'service': alternate_backend_service.url
1041                     }
1042                 ],
1043                 {
1044                     "UnaryCall": original_backend_instances,
1045                     "EmptyCall": alternate_backend_instances
1046                 }),
1047             (
1048                 [{
1049                     'priority': 0,
1050                     # Regex UnaryCall -> alternate_backend_service.
1051                     'matchRules': [{
1052                         'regexMatch':
1053                             '^\/.*\/UnaryCall$'  # Unary methods with any services.
1054                     }],
1055                     'service': alternate_backend_service.url
1056                 }],
1057                 {
1058                     "UnaryCall": alternate_backend_instances,
1059                     "EmptyCall": original_backend_instances
1060                 }),
1061             (
1062                 [{
1063                     'priority': 0,
1064                     # ignoreCase EmptyCall -> alternate_backend_service.
1065                     'matchRules': [{
1066                         # Case insensitive matching.
1067                         'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl',
1068                         'ignoreCase': True,
1069                     }],
1070                     'service': alternate_backend_service.url
1071                 }],
1072                 {
1073                     "UnaryCall": original_backend_instances,
1074                     "EmptyCall": alternate_backend_instances
1075                 }),
1076         ]
1077
1078         for (route_rules, expected_instances) in test_cases:
1079             logger.info('patching url map with %s', route_rules)
1080             patch_url_map_backend_service(gcp,
1081                                           original_backend_service,
1082                                           route_rules=route_rules)
1083
1084             # Wait for traffic to go to both services.
1085             logger.info(
1086                 'waiting for traffic to go to all backends (including alternate)'
1087             )
1088             wait_until_all_rpcs_go_to_given_backends(
1089                 original_backend_instances + alternate_backend_instances,
1090                 _WAIT_FOR_STATS_SEC)
1091
1092             retry_count = 80
1093             # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1094             # seconds timeout.
1095             for i in range(retry_count):
1096                 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1097                 if not stats.rpcs_by_method:
1098                     raise ValueError(
1099                         'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1100                     )
1101                 logger.info('attempt %d', i)
1102                 if compare_expected_instances(stats, expected_instances):
1103                     logger.info("success")
1104                     break
1105                 elif i == retry_count - 1:
1106                     raise Exception(
1107                         'timeout waiting for RPCs to the expected instances: %s'
1108                         % expected_instances)
1109     finally:
1110         patch_url_map_backend_service(gcp, original_backend_service)
1111         patch_backend_service(gcp, alternate_backend_service, [])
1112
1113
1114 def test_header_matching(gcp, original_backend_service, instance_group,
1115                          alternate_backend_service, same_zone_instance_group):
1116     # This test start with all traffic (UnaryCall and EmptyCall) going to
1117     # original_backend_service.
1118     #
1119     # Then it updates URL-map to add routes, to make RPCs with test headers to
1120     # go to different backends. It waits for all backends in both services to
1121     # receive traffic, then verifies that traffic goes to the expected
1122     # backends.
1123     logger.info('Running test_header_matching')
1124
1125     original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1126         gcp, original_backend_service, instance_group,
1127         alternate_backend_service, same_zone_instance_group)
1128
1129     try:
1130         # A list of tuples (route_rules, expected_instances).
1131         test_cases = [
1132             (
1133                 [{
1134                     'priority': 0,
1135                     # Header ExactMatch -> alternate_backend_service.
1136                     # EmptyCall is sent with the metadata.
1137                     'matchRules': [{
1138                         'prefixMatch':
1139                             '/',
1140                         'headerMatches': [{
1141                             'headerName': _TEST_METADATA_KEY,
1142                             'exactMatch': _TEST_METADATA_VALUE_EMPTY
1143                         }]
1144                     }],
1145                     'service': alternate_backend_service.url
1146                 }],
1147                 {
1148                     "EmptyCall": alternate_backend_instances,
1149                     "UnaryCall": original_backend_instances
1150                 }),
1151             (
1152                 [{
1153                     'priority': 0,
1154                     # Header PrefixMatch -> alternate_backend_service.
1155                     # UnaryCall is sent with the metadata.
1156                     'matchRules': [{
1157                         'prefixMatch':
1158                             '/',
1159                         'headerMatches': [{
1160                             'headerName': _TEST_METADATA_KEY,
1161                             'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
1162                         }]
1163                     }],
1164                     'service': alternate_backend_service.url
1165                 }],
1166                 {
1167                     "EmptyCall": original_backend_instances,
1168                     "UnaryCall": alternate_backend_instances
1169                 }),
1170             (
1171                 [{
1172                     'priority': 0,
1173                     # Header SuffixMatch -> alternate_backend_service.
1174                     # EmptyCall is sent with the metadata.
1175                     'matchRules': [{
1176                         'prefixMatch':
1177                             '/',
1178                         'headerMatches': [{
1179                             'headerName': _TEST_METADATA_KEY,
1180                             'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
1181                         }]
1182                     }],
1183                     'service': alternate_backend_service.url
1184                 }],
1185                 {
1186                     "EmptyCall": alternate_backend_instances,
1187                     "UnaryCall": original_backend_instances
1188                 }),
1189             (
1190                 [{
1191                     'priority': 0,
1192                     # Header 'xds_md_numeric' present -> alternate_backend_service.
1193                     # UnaryCall is sent with the metadata, so will be sent to alternative.
1194                     'matchRules': [{
1195                         'prefixMatch':
1196                             '/',
1197                         'headerMatches': [{
1198                             'headerName': _TEST_METADATA_NUMERIC_KEY,
1199                             'presentMatch': True
1200                         }]
1201                     }],
1202                     'service': alternate_backend_service.url
1203                 }],
1204                 {
1205                     "EmptyCall": original_backend_instances,
1206                     "UnaryCall": alternate_backend_instances
1207                 }),
1208             (
1209                 [{
1210                     'priority': 0,
1211                     # Header invert ExactMatch -> alternate_backend_service.
1212                     # UnaryCall is sent with the metadata, so will be sent to
1213                     # original. EmptyCall will be sent to alternative.
1214                     'matchRules': [{
1215                         'prefixMatch':
1216                             '/',
1217                         'headerMatches': [{
1218                             'headerName': _TEST_METADATA_KEY,
1219                             'exactMatch': _TEST_METADATA_VALUE_UNARY,
1220                             'invertMatch': True
1221                         }]
1222                     }],
1223                     'service': alternate_backend_service.url
1224                 }],
1225                 {
1226                     "EmptyCall": alternate_backend_instances,
1227                     "UnaryCall": original_backend_instances
1228                 }),
1229             (
1230                 [{
1231                     'priority': 0,
1232                     # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
1233                     # UnaryCall is sent with the metadata in range.
1234                     'matchRules': [{
1235                         'prefixMatch':
1236                             '/',
1237                         'headerMatches': [{
1238                             'headerName': _TEST_METADATA_NUMERIC_KEY,
1239                             'rangeMatch': {
1240                                 'rangeStart': '100',
1241                                 'rangeEnd': '200'
1242                             }
1243                         }]
1244                     }],
1245                     'service': alternate_backend_service.url
1246                 }],
1247                 {
1248                     "EmptyCall": original_backend_instances,
1249                     "UnaryCall": alternate_backend_instances
1250                 }),
1251             (
1252                 [{
1253                     'priority': 0,
1254                     # Header RegexMatch -> alternate_backend_service.
1255                     # EmptyCall is sent with the metadata.
1256                     'matchRules': [{
1257                         'prefixMatch':
1258                             '/',
1259                         'headerMatches': [{
1260                             'headerName':
1261                                 _TEST_METADATA_KEY,
1262                             'regexMatch':
1263                                 "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2],
1264                                               _TEST_METADATA_VALUE_EMPTY[-2:])
1265                         }]
1266                     }],
1267                     'service': alternate_backend_service.url
1268                 }],
1269                 {
1270                     "EmptyCall": alternate_backend_instances,
1271                     "UnaryCall": original_backend_instances
1272                 }),
1273         ]
1274
1275         for (route_rules, expected_instances) in test_cases:
1276             logger.info('patching url map with %s -> alternative',
1277                         route_rules[0]['matchRules'])
1278             patch_url_map_backend_service(gcp,
1279                                           original_backend_service,
1280                                           route_rules=route_rules)
1281
1282             # Wait for traffic to go to both services.
1283             logger.info(
1284                 'waiting for traffic to go to all backends (including alternate)'
1285             )
1286             wait_until_all_rpcs_go_to_given_backends(
1287                 original_backend_instances + alternate_backend_instances,
1288                 _WAIT_FOR_STATS_SEC)
1289
1290             retry_count = 80
1291             # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1292             # seconds timeout.
1293             for i in range(retry_count):
1294                 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1295                 if not stats.rpcs_by_method:
1296                     raise ValueError(
1297                         'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1298                     )
1299                 logger.info('attempt %d', i)
1300                 if compare_expected_instances(stats, expected_instances):
1301                     logger.info("success")
1302                     break
1303                 elif i == retry_count - 1:
1304                     raise Exception(
1305                         'timeout waiting for RPCs to the expected instances: %s'
1306                         % expected_instances)
1307     finally:
1308         patch_url_map_backend_service(gcp, original_backend_service)
1309         patch_backend_service(gcp, alternate_backend_service, [])
1310
1311
1312 def test_circuit_breaking(gcp, original_backend_service, instance_group,
1313                           same_zone_instance_group):
1314     '''
1315     Since backend service circuit_breakers configuration cannot be unset,
1316     which causes trouble for restoring validate_for_proxy flag in target
1317     proxy/global forwarding rule. This test uses dedicated backend sevices.
1318     The url_map and backend services undergoes the following state changes:
1319
1320     Before test:
1321        original_backend_service -> [instance_group]
1322        extra_backend_service -> []
1323        more_extra_backend_service -> []
1324
1325        url_map -> [original_backend_service]
1326
1327     In test:
1328        extra_backend_service (with circuit_breakers) -> [instance_group]
1329        more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
1330
1331        url_map -> [extra_backend_service, more_extra_backend_service]
1332
1333     After test:
1334        original_backend_service -> [instance_group]
1335        extra_backend_service (with circuit_breakers) -> []
1336        more_extra_backend_service (with circuit_breakers) -> []
1337
1338        url_map -> [original_backend_service]
1339     '''
1340     logger.info('Running test_circuit_breaking')
1341     additional_backend_services = []
1342     try:
1343         # TODO(chengyuanzhang): Dedicated backend services created for circuit
1344         # breaking test. Once the issue for unsetting backend service circuit
1345         # breakers is resolved or configuring backend service circuit breakers is
1346         # enabled for config validation, these dedicated backend services can be
1347         # eliminated.
1348         extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
1349         more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
1350         extra_backend_service = add_backend_service(gcp,
1351                                                     extra_backend_service_name)
1352         additional_backend_services.append(extra_backend_service)
1353         more_extra_backend_service = add_backend_service(
1354             gcp, more_extra_backend_service_name)
1355         additional_backend_services.append(more_extra_backend_service)
1356         # The config validation for proxyless doesn't allow setting
1357         # circuit_breakers. Disable validate validate_for_proxyless
1358         # for this test. This can be removed when validation
1359         # accepts circuit_breakers.
1360         logger.info('disabling validate_for_proxyless in target proxy')
1361         set_validate_for_proxyless(gcp, False)
1362         extra_backend_service_max_requests = 500
1363         more_extra_backend_service_max_requests = 1000
1364         patch_backend_service(gcp,
1365                               extra_backend_service, [instance_group],
1366                               circuit_breakers={
1367                                   'maxRequests':
1368                                       extra_backend_service_max_requests
1369                               })
1370         logger.info('Waiting for extra backends to become healthy')
1371         wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
1372         patch_backend_service(gcp,
1373                               more_extra_backend_service,
1374                               [same_zone_instance_group],
1375                               circuit_breakers={
1376                                   'maxRequests':
1377                                       more_extra_backend_service_max_requests
1378                               })
1379         logger.info('Waiting for more extra backend to become healthy')
1380         wait_for_healthy_backends(gcp, more_extra_backend_service,
1381                                   same_zone_instance_group)
1382         extra_backend_instances = get_instance_names(gcp, instance_group)
1383         more_extra_backend_instances = get_instance_names(
1384             gcp, same_zone_instance_group)
1385         route_rules = [
1386             {
1387                 'priority': 0,
1388                 # UnaryCall -> extra_backend_service
1389                 'matchRules': [{
1390                     'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1391                 }],
1392                 'service': extra_backend_service.url
1393             },
1394             {
1395                 'priority': 1,
1396                 # EmptyCall -> more_extra_backend_service
1397                 'matchRules': [{
1398                     'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1399                 }],
1400                 'service': more_extra_backend_service.url
1401             },
1402         ]
1403
1404         # Make client send UNARY_CALL and EMPTY_CALL.
1405         configure_client([
1406             messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1407             messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1408         ])
1409         logger.info('Patching url map with %s', route_rules)
1410         patch_url_map_backend_service(gcp,
1411                                       extra_backend_service,
1412                                       route_rules=route_rules)
1413         logger.info('Waiting for traffic to go to all backends')
1414         wait_until_all_rpcs_go_to_given_backends(
1415             extra_backend_instances + more_extra_backend_instances,
1416             _WAIT_FOR_STATS_SEC)
1417
1418         # Make all calls keep-open.
1419         configure_client([
1420             messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1421             messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1422         ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1423              'rpc-behavior', 'keep-open'),
1424             (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1425              'rpc-behavior', 'keep-open')])
1426         wait_until_rpcs_in_flight(
1427             'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1428                            int(extra_backend_service_max_requests / args.qps)),
1429             extra_backend_service_max_requests, 1)
1430         logger.info('UNARY_CALL reached stable state (%d)',
1431                     extra_backend_service_max_requests)
1432         wait_until_rpcs_in_flight(
1433             'EMPTY_CALL',
1434             (_WAIT_FOR_BACKEND_SEC +
1435              int(more_extra_backend_service_max_requests / args.qps)),
1436             more_extra_backend_service_max_requests, 1)
1437         logger.info('EMPTY_CALL reached stable state (%d)',
1438                     more_extra_backend_service_max_requests)
1439
1440         # Increment circuit breakers max_requests threshold.
1441         extra_backend_service_max_requests = 800
1442         patch_backend_service(gcp,
1443                               extra_backend_service, [instance_group],
1444                               circuit_breakers={
1445                                   'maxRequests':
1446                                       extra_backend_service_max_requests
1447                               })
1448         wait_until_rpcs_in_flight(
1449             'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1450                            int(extra_backend_service_max_requests / args.qps)),
1451             extra_backend_service_max_requests, 1)
1452         logger.info('UNARY_CALL reached stable state after increase (%d)',
1453                     extra_backend_service_max_requests)
1454         logger.info('success')
1455         # Avoid new RPCs being outstanding (some test clients create threads
1456         # for sending RPCs) after restoring backend services.
1457         configure_client(
1458             [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL])
1459     finally:
1460         patch_url_map_backend_service(gcp, original_backend_service)
1461         patch_backend_service(gcp, original_backend_service, [instance_group])
1462         for backend_service in additional_backend_services:
1463             delete_backend_service(gcp, backend_service)
1464         set_validate_for_proxyless(gcp, True)
1465
1466
1467 def test_timeout(gcp, original_backend_service, instance_group):
1468     logger.info('Running test_timeout')
1469
1470     logger.info('waiting for original backends to become healthy')
1471     wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1472
1473     # UnaryCall -> maxStreamDuration:3s
1474     route_rules = [{
1475         'priority': 0,
1476         'matchRules': [{
1477             'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1478         }],
1479         'service': original_backend_service.url,
1480         'routeAction': {
1481             'maxStreamDuration': {
1482                 'seconds': 3,
1483             },
1484         },
1485     }]
1486     patch_url_map_backend_service(gcp,
1487                                   original_backend_service,
1488                                   route_rules=route_rules)
1489     # A list of tuples (testcase_name, {client_config}, {expected_results})
1490     test_cases = [
1491         (
1492             'app_timeout_exceeded',
1493             # UnaryCall only with sleep-2; timeout=1s; calls timeout.
1494             {
1495                 'rpc_types': [
1496                     messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1497                 ],
1498                 'metadata': [
1499                     (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1500                      'rpc-behavior', 'sleep-2'),
1501                 ],
1502                 'timeout_sec': 1,
1503             },
1504             {
1505                 'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1506             },
1507         ),
1508         (
1509             'timeout_not_exceeded',
1510             # UnaryCall only with no sleep; calls succeed.
1511             {
1512                 'rpc_types': [
1513                     messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1514                 ],
1515             },
1516             {
1517                 'UNARY_CALL': 0,
1518             },
1519         ),
1520         (
1521             'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)',
1522             # UnaryCall and EmptyCall both sleep-4.
1523             # UnaryCall timeouts, EmptyCall succeeds.
1524             {
1525                 'rpc_types': [
1526                     messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1527                     messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1528                 ],
1529                 'metadata': [
1530                     (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1531                      'rpc-behavior', 'sleep-4'),
1532                     (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1533                      'rpc-behavior', 'sleep-4'),
1534                 ],
1535             },
1536             {
1537                 'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1538                 'EMPTY_CALL': 0,
1539             },
1540         ),
1541     ]
1542
1543     try:
1544         for (testcase_name, client_config, expected_results) in test_cases:
1545             logger.info('starting case %s', testcase_name)
1546             configure_client(**client_config)
1547             # wait a second to help ensure the client stops sending RPCs with
1548             # the old config.  We will make multiple attempts if it is failing,
1549             # but this improves confidence that the test is valid if the
1550             # previous client_config would lead to the same results.
1551             time.sleep(1)
1552             # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
1553             # second timeout.
1554             attempt_count = 20
1555             before_stats = get_client_accumulated_stats()
1556             if not before_stats.stats_per_method:
1557                 raise ValueError(
1558                     'stats.stats_per_method is None, the interop client stats service does not support this test case'
1559                 )
1560             for i in range(attempt_count):
1561                 logger.info('%s: attempt %d', testcase_name, i)
1562
1563                 test_runtime_secs = 10
1564                 time.sleep(test_runtime_secs)
1565                 after_stats = get_client_accumulated_stats()
1566
1567                 success = True
1568                 for rpc, status in expected_results.items():
1569                     qty = (after_stats.stats_per_method[rpc].result[status] -
1570                            before_stats.stats_per_method[rpc].result[status])
1571                     want = test_runtime_secs * args.qps
1572                     # Allow 10% deviation from expectation to reduce flakiness
1573                     if qty < (want * .9) or qty > (want * 1.1):
1574                         logger.info('%s: failed due to %s[%s]: got %d want ~%d',
1575                                     testcase_name, rpc, status, qty, want)
1576                         success = False
1577                 if success:
1578                     logger.info('success')
1579                     break
1580                 logger.info('%s attempt %d failed', testcase_name, i)
1581                 before_stats = after_stats
1582             else:
1583                 raise Exception(
1584                     '%s: timeout waiting for expected results: %s; got %s' %
1585                     (testcase_name, expected_results,
1586                      after_stats.stats_per_method))
1587     finally:
1588         patch_url_map_backend_service(gcp, original_backend_service)
1589
1590
1591 def test_fault_injection(gcp, original_backend_service, instance_group):
1592     logger.info('Running test_fault_injection')
1593
1594     logger.info('waiting for original backends to become healthy')
1595     wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1596
1597     testcase_header = 'fi_testcase'
1598
1599     def _route(pri, name, fi_policy):
1600         return {
1601             'priority': pri,
1602             'matchRules': [{
1603                 'prefixMatch':
1604                     '/',
1605                 'headerMatches': [{
1606                     'headerName': testcase_header,
1607                     'exactMatch': name,
1608                 }],
1609             }],
1610             'service': original_backend_service.url,
1611             'routeAction': {
1612                 'faultInjectionPolicy': fi_policy
1613             },
1614         }
1615
1616     def _abort(pct):
1617         return {
1618             'abort': {
1619                 'httpStatus': 401,
1620                 'percentage': pct,
1621             }
1622         }
1623
1624     def _delay(pct):
1625         return {
1626             'delay': {
1627                 'fixedDelay': {
1628                     'seconds': '20'
1629                 },
1630                 'percentage': pct,
1631             }
1632         }
1633
1634     zero_route = _abort(0)
1635     zero_route.update(_delay(0))
1636     route_rules = [
1637         _route(0, 'zero_percent_fault_injection', zero_route),
1638         _route(1, 'always_delay', _delay(100)),
1639         _route(2, 'always_abort', _abort(100)),
1640         _route(3, 'delay_half', _delay(50)),
1641         _route(4, 'abort_half', _abort(50)),
1642         {
1643             'priority': 5,
1644             'matchRules': [{
1645                 'prefixMatch': '/'
1646             }],
1647             'service': original_backend_service.url,
1648         },
1649     ]
1650     set_validate_for_proxyless(gcp, False)
1651     patch_url_map_backend_service(gcp,
1652                                   original_backend_service,
1653                                   route_rules=route_rules)
1654     # A list of tuples (testcase_name, {client_config}, {code: percent}).  Each
1655     # test case will set the testcase_header with the testcase_name for routing
1656     # to the appropriate config for the case, defined above.
1657     test_cases = [
1658         (
1659             'zero_percent_fault_injection',
1660             {},
1661             {
1662                 0: 1
1663             },  # OK
1664         ),
1665         (
1666             'non_matching_fault_injection',  # Not in route_rules, above.
1667             {},
1668             {
1669                 0: 1
1670             },  # OK
1671         ),
1672         (
1673             'always_delay',
1674             {
1675                 'timeout_sec': 2
1676             },
1677             {
1678                 4: 1
1679             },  # DEADLINE_EXCEEDED
1680         ),
1681         (
1682             'always_abort',
1683             {},
1684             {
1685                 16: 1
1686             },  # UNAUTHENTICATED
1687         ),
1688         (
1689             'delay_half',
1690             {
1691                 'timeout_sec': 2
1692             },
1693             {
1694                 4: .5,
1695                 0: .5
1696             },  # DEADLINE_EXCEEDED / OK: 50% / 50%
1697         ),
1698         (
1699             'abort_half',
1700             {},
1701             {
1702                 16: .5,
1703                 0: .5
1704             },  # UNAUTHENTICATED / OK: 50% / 50%
1705         )
1706     ]
1707
1708     try:
1709         first_case = True
1710         for (testcase_name, client_config, expected_results) in test_cases:
1711             logger.info('starting case %s', testcase_name)
1712
1713             client_config['metadata'] = [
1714                 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1715                  testcase_header, testcase_name)
1716             ]
1717             client_config['rpc_types'] = [
1718                 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1719             ]
1720             configure_client(**client_config)
1721             # wait a second to help ensure the client stops sending RPCs with
1722             # the old config.  We will make multiple attempts if it is failing,
1723             # but this improves confidence that the test is valid if the
1724             # previous client_config would lead to the same results.
1725             time.sleep(1)
1726             # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
1727             # second timeout.
1728             attempt_count = 20
1729             if first_case:
1730                 attempt_count = 120
1731                 first_case = False
1732             before_stats = get_client_accumulated_stats()
1733             if not before_stats.stats_per_method:
1734                 raise ValueError(
1735                     'stats.stats_per_method is None, the interop client stats service does not support this test case'
1736                 )
1737             for i in range(attempt_count):
1738                 logger.info('%s: attempt %d', testcase_name, i)
1739
1740                 test_runtime_secs = 10
1741                 time.sleep(test_runtime_secs)
1742                 after_stats = get_client_accumulated_stats()
1743
1744                 success = True
1745                 for status, pct in expected_results.items():
1746                     rpc = 'UNARY_CALL'
1747                     qty = (after_stats.stats_per_method[rpc].result[status] -
1748                            before_stats.stats_per_method[rpc].result[status])
1749                     want = pct * args.qps * test_runtime_secs
1750                     # Allow 10% deviation from expectation to reduce flakiness
1751                     VARIANCE_ALLOWED = 0.1
1752                     if abs(qty - want) > want * VARIANCE_ALLOWED:
1753                         logger.info('%s: failed due to %s[%s]: got %d want ~%d',
1754                                     testcase_name, rpc, status, qty, want)
1755                         success = False
1756                 if success:
1757                     logger.info('success')
1758                     break
1759                 logger.info('%s attempt %d failed', testcase_name, i)
1760                 before_stats = after_stats
1761             else:
1762                 raise Exception(
1763                     '%s: timeout waiting for expected results: %s; got %s' %
1764                     (testcase_name, expected_results,
1765                      after_stats.stats_per_method))
1766     finally:
1767         patch_url_map_backend_service(gcp, original_backend_service)
1768         set_validate_for_proxyless(gcp, True)
1769
1770
1771 def set_validate_for_proxyless(gcp, validate_for_proxyless):
1772     if not gcp.alpha_compute:
1773         logger.debug(
1774             'Not setting validateForProxy because alpha is not enabled')
1775         return
1776     # This function deletes global_forwarding_rule and target_proxy, then
1777     # recreate target_proxy with validateForProxyless=False. This is necessary
1778     # because patching target_grpc_proxy isn't supported.
1779     delete_global_forwarding_rule(gcp)
1780     delete_target_proxy(gcp)
1781     create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless)
1782     create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name,
1783                                   [gcp.service_port])
1784
1785
1786 def get_serving_status(instance, service_port):
1787     with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel:
1788         health_stub = health_pb2_grpc.HealthStub(channel)
1789         return health_stub.Check(health_pb2.HealthCheckRequest())
1790
1791
1792 def set_serving_status(instances, service_port, serving):
1793     logger.info('setting %s serving status to %s', instances, serving)
1794     for instance in instances:
1795         with grpc.insecure_channel('%s:%d' %
1796                                    (instance, service_port)) as channel:
1797             logger.info('setting %s serving status to %s', instance, serving)
1798             stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
1799             retry_count = 5
1800             for i in range(5):
1801                 if serving:
1802                     stub.SetServing(empty_pb2.Empty())
1803                 else:
1804                     stub.SetNotServing(empty_pb2.Empty())
1805                 serving_status = get_serving_status(instance, service_port)
1806                 logger.info('got instance service status %s', serving_status)
1807                 want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
1808                 if serving_status.status == want_status:
1809                     break
1810                 if i == retry_count - 1:
1811                     raise Exception(
1812                         'failed to set instance service status after %d retries'
1813                         % retry_count)
1814
1815
1816 def is_primary_instance_group(gcp, instance_group):
1817     # Clients may connect to a TD instance in a different region than the
1818     # client, in which case primary/secondary assignments may not be based on
1819     # the client's actual locality.
1820     instance_names = get_instance_names(gcp, instance_group)
1821     stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1822     return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
1823
1824
1825 def get_startup_script(path_to_server_binary, service_port):
1826     if path_to_server_binary:
1827         return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
1828                                                      service_port)
1829     else:
1830         return """#!/bin/bash
1831 sudo apt update
1832 sudo apt install -y git default-jdk
1833 mkdir java_server
1834 pushd java_server
1835 git clone https://github.com/grpc/grpc-java.git
1836 pushd grpc-java
1837 pushd interop-testing
1838 ../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
1839
1840 nohup build/install/grpc-interop-testing/bin/xds-test-server \
1841     --port=%d 1>/dev/null &""" % service_port
1842
1843
1844 def create_instance_template(gcp, name, network, source_image, machine_type,
1845                              startup_script):
1846     config = {
1847         'name': name,
1848         'properties': {
1849             'tags': {
1850                 'items': ['allow-health-checks']
1851             },
1852             'machineType': machine_type,
1853             'serviceAccounts': [{
1854                 'email': 'default',
1855                 'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
1856             }],
1857             'networkInterfaces': [{
1858                 'accessConfigs': [{
1859                     'type': 'ONE_TO_ONE_NAT'
1860                 }],
1861                 'network': network
1862             }],
1863             'disks': [{
1864                 'boot': True,
1865                 'initializeParams': {
1866                     'sourceImage': source_image
1867                 }
1868             }],
1869             'metadata': {
1870                 'items': [{
1871                     'key': 'startup-script',
1872                     'value': startup_script
1873                 }]
1874             }
1875         }
1876     }
1877
1878     logger.debug('Sending GCP request with body=%s', config)
1879     result = gcp.compute.instanceTemplates().insert(
1880         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1881     wait_for_global_operation(gcp, result['name'])
1882     gcp.instance_template = GcpResource(config['name'], result['targetLink'])
1883
1884
1885 def add_instance_group(gcp, zone, name, size):
1886     config = {
1887         'name': name,
1888         'instanceTemplate': gcp.instance_template.url,
1889         'targetSize': size,
1890         'namedPorts': [{
1891             'name': 'grpc',
1892             'port': gcp.service_port
1893         }]
1894     }
1895
1896     logger.debug('Sending GCP request with body=%s', config)
1897     result = gcp.compute.instanceGroupManagers().insert(
1898         project=gcp.project, zone=zone,
1899         body=config).execute(num_retries=_GCP_API_RETRIES)
1900     wait_for_zone_operation(gcp, zone, result['name'])
1901     result = gcp.compute.instanceGroupManagers().get(
1902         project=gcp.project, zone=zone,
1903         instanceGroupManager=config['name']).execute(
1904             num_retries=_GCP_API_RETRIES)
1905     instance_group = InstanceGroup(config['name'], result['instanceGroup'],
1906                                    zone)
1907     gcp.instance_groups.append(instance_group)
1908     wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
1909                                                    _WAIT_FOR_OPERATION_SEC)
1910     return instance_group
1911
1912
1913 def create_health_check(gcp, name):
1914     if gcp.alpha_compute:
1915         config = {
1916             'name': name,
1917             'type': 'GRPC',
1918             'grpcHealthCheck': {
1919                 'portSpecification': 'USE_SERVING_PORT'
1920             }
1921         }
1922         compute_to_use = gcp.alpha_compute
1923     else:
1924         config = {
1925             'name': name,
1926             'type': 'TCP',
1927             'tcpHealthCheck': {
1928                 'portName': 'grpc'
1929             }
1930         }
1931         compute_to_use = gcp.compute
1932     logger.debug('Sending GCP request with body=%s', config)
1933     result = compute_to_use.healthChecks().insert(
1934         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1935     wait_for_global_operation(gcp, result['name'])
1936     gcp.health_check = GcpResource(config['name'], result['targetLink'])
1937
1938
1939 def create_health_check_firewall_rule(gcp, name):
1940     config = {
1941         'name': name,
1942         'direction': 'INGRESS',
1943         'allowed': [{
1944             'IPProtocol': 'tcp'
1945         }],
1946         'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
1947         'targetTags': ['allow-health-checks'],
1948     }
1949     logger.debug('Sending GCP request with body=%s', config)
1950     result = gcp.compute.firewalls().insert(
1951         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1952     wait_for_global_operation(gcp, result['name'])
1953     gcp.health_check_firewall_rule = GcpResource(config['name'],
1954                                                  result['targetLink'])
1955
1956
1957 def add_backend_service(gcp, name):
1958     if gcp.alpha_compute:
1959         protocol = 'GRPC'
1960         compute_to_use = gcp.alpha_compute
1961     else:
1962         protocol = 'HTTP2'
1963         compute_to_use = gcp.compute
1964     config = {
1965         'name': name,
1966         'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
1967         'healthChecks': [gcp.health_check.url],
1968         'portName': 'grpc',
1969         'protocol': protocol
1970     }
1971     logger.debug('Sending GCP request with body=%s', config)
1972     result = compute_to_use.backendServices().insert(
1973         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1974     wait_for_global_operation(gcp, result['name'])
1975     backend_service = GcpResource(config['name'], result['targetLink'])
1976     gcp.backend_services.append(backend_service)
1977     return backend_service
1978
1979
1980 def create_url_map(gcp, name, backend_service, host_name):
1981     config = {
1982         'name': name,
1983         'defaultService': backend_service.url,
1984         'pathMatchers': [{
1985             'name': _PATH_MATCHER_NAME,
1986             'defaultService': backend_service.url,
1987         }],
1988         'hostRules': [{
1989             'hosts': [host_name],
1990             'pathMatcher': _PATH_MATCHER_NAME
1991         }]
1992     }
1993     logger.debug('Sending GCP request with body=%s', config)
1994     result = gcp.compute.urlMaps().insert(
1995         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1996     wait_for_global_operation(gcp, result['name'])
1997     gcp.url_map = GcpResource(config['name'], result['targetLink'])
1998
1999
2000 def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
2001     config = {
2002         'hostRules': [{
2003             'hosts': ['%s:%d' % (host_name, gcp.service_port)],
2004             'pathMatcher': _PATH_MATCHER_NAME
2005         }]
2006     }
2007     logger.debug('Sending GCP request with body=%s', config)
2008     result = gcp.compute.urlMaps().patch(
2009         project=gcp.project, urlMap=name,
2010         body=config).execute(num_retries=_GCP_API_RETRIES)
2011     wait_for_global_operation(gcp, result['name'])
2012
2013
2014 def create_target_proxy(gcp, name, validate_for_proxyless=True):
2015     if gcp.alpha_compute:
2016         config = {
2017             'name': name,
2018             'url_map': gcp.url_map.url,
2019             'validate_for_proxyless': validate_for_proxyless
2020         }
2021         logger.debug('Sending GCP request with body=%s', config)
2022         result = gcp.alpha_compute.targetGrpcProxies().insert(
2023             project=gcp.project,
2024             body=config).execute(num_retries=_GCP_API_RETRIES)
2025     else:
2026         config = {
2027             'name': name,
2028             'url_map': gcp.url_map.url,
2029         }
2030         logger.debug('Sending GCP request with body=%s', config)
2031         result = gcp.compute.targetHttpProxies().insert(
2032             project=gcp.project,
2033             body=config).execute(num_retries=_GCP_API_RETRIES)
2034     wait_for_global_operation(gcp, result['name'])
2035     gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
2036
2037
2038 def create_global_forwarding_rule(gcp, name, potential_ports):
2039     if gcp.alpha_compute:
2040         compute_to_use = gcp.alpha_compute
2041     else:
2042         compute_to_use = gcp.compute
2043     for port in potential_ports:
2044         try:
2045             config = {
2046                 'name': name,
2047                 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2048                 'portRange': str(port),
2049                 'IPAddress': '0.0.0.0',
2050                 'network': args.network,
2051                 'target': gcp.target_proxy.url,
2052             }
2053             logger.debug('Sending GCP request with body=%s', config)
2054             result = compute_to_use.globalForwardingRules().insert(
2055                 project=gcp.project,
2056                 body=config).execute(num_retries=_GCP_API_RETRIES)
2057             wait_for_global_operation(gcp, result['name'])
2058             gcp.global_forwarding_rule = GcpResource(config['name'],
2059                                                      result['targetLink'])
2060             gcp.service_port = port
2061             return
2062         except googleapiclient.errors.HttpError as http_error:
2063             logger.warning(
2064                 'Got error %s when attempting to create forwarding rule to '
2065                 '0.0.0.0:%d. Retrying with another port.' % (http_error, port))
2066
2067
2068 def get_health_check(gcp, health_check_name):
2069     result = gcp.compute.healthChecks().get(
2070         project=gcp.project, healthCheck=health_check_name).execute()
2071     gcp.health_check = GcpResource(health_check_name, result['selfLink'])
2072
2073
2074 def get_health_check_firewall_rule(gcp, firewall_name):
2075     result = gcp.compute.firewalls().get(project=gcp.project,
2076                                          firewall=firewall_name).execute()
2077     gcp.health_check_firewall_rule = GcpResource(firewall_name,
2078                                                  result['selfLink'])
2079
2080
2081 def get_backend_service(gcp, backend_service_name):
2082     result = gcp.compute.backendServices().get(
2083         project=gcp.project, backendService=backend_service_name).execute()
2084     backend_service = GcpResource(backend_service_name, result['selfLink'])
2085     gcp.backend_services.append(backend_service)
2086     return backend_service
2087
2088
2089 def get_url_map(gcp, url_map_name):
2090     result = gcp.compute.urlMaps().get(project=gcp.project,
2091                                        urlMap=url_map_name).execute()
2092     gcp.url_map = GcpResource(url_map_name, result['selfLink'])
2093
2094
2095 def get_target_proxy(gcp, target_proxy_name):
2096     if gcp.alpha_compute:
2097         result = gcp.alpha_compute.targetGrpcProxies().get(
2098             project=gcp.project, targetGrpcProxy=target_proxy_name).execute()
2099     else:
2100         result = gcp.compute.targetHttpProxies().get(
2101             project=gcp.project, targetHttpProxy=target_proxy_name).execute()
2102     gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink'])
2103
2104
2105 def get_global_forwarding_rule(gcp, forwarding_rule_name):
2106     result = gcp.compute.globalForwardingRules().get(
2107         project=gcp.project, forwardingRule=forwarding_rule_name).execute()
2108     gcp.global_forwarding_rule = GcpResource(forwarding_rule_name,
2109                                              result['selfLink'])
2110
2111
2112 def get_instance_template(gcp, template_name):
2113     result = gcp.compute.instanceTemplates().get(
2114         project=gcp.project, instanceTemplate=template_name).execute()
2115     gcp.instance_template = GcpResource(template_name, result['selfLink'])
2116
2117
2118 def get_instance_group(gcp, zone, instance_group_name):
2119     result = gcp.compute.instanceGroups().get(
2120         project=gcp.project, zone=zone,
2121         instanceGroup=instance_group_name).execute()
2122     gcp.service_port = result['namedPorts'][0]['port']
2123     instance_group = InstanceGroup(instance_group_name, result['selfLink'],
2124                                    zone)
2125     gcp.instance_groups.append(instance_group)
2126     return instance_group
2127
2128
2129 def delete_global_forwarding_rule(gcp):
2130     try:
2131         result = gcp.compute.globalForwardingRules().delete(
2132             project=gcp.project,
2133             forwardingRule=gcp.global_forwarding_rule.name).execute(
2134                 num_retries=_GCP_API_RETRIES)
2135         wait_for_global_operation(gcp, result['name'])
2136     except googleapiclient.errors.HttpError as http_error:
2137         logger.info('Delete failed: %s', http_error)
2138
2139
2140 def delete_target_proxy(gcp):
2141     try:
2142         if gcp.alpha_compute:
2143             result = gcp.alpha_compute.targetGrpcProxies().delete(
2144                 project=gcp.project,
2145                 targetGrpcProxy=gcp.target_proxy.name).execute(
2146                     num_retries=_GCP_API_RETRIES)
2147         else:
2148             result = gcp.compute.targetHttpProxies().delete(
2149                 project=gcp.project,
2150                 targetHttpProxy=gcp.target_proxy.name).execute(
2151                     num_retries=_GCP_API_RETRIES)
2152         wait_for_global_operation(gcp, result['name'])
2153     except googleapiclient.errors.HttpError as http_error:
2154         logger.info('Delete failed: %s', http_error)
2155
2156
2157 def delete_url_map(gcp):
2158     try:
2159         result = gcp.compute.urlMaps().delete(
2160             project=gcp.project,
2161             urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
2162         wait_for_global_operation(gcp, result['name'])
2163     except googleapiclient.errors.HttpError as http_error:
2164         logger.info('Delete failed: %s', http_error)
2165
2166
2167 def delete_backend_service(gcp, backend_service):
2168     try:
2169         result = gcp.compute.backendServices().delete(
2170             project=gcp.project, backendService=backend_service.name).execute(
2171                 num_retries=_GCP_API_RETRIES)
2172         wait_for_global_operation(gcp, result['name'])
2173     except googleapiclient.errors.HttpError as http_error:
2174         logger.info('Delete failed: %s', http_error)
2175
2176
2177 def delete_backend_services(gcp):
2178     for backend_service in gcp.backend_services:
2179         delete_backend_service(gcp, backend_service)
2180
2181
2182 def delete_firewall(gcp):
2183     try:
2184         result = gcp.compute.firewalls().delete(
2185             project=gcp.project,
2186             firewall=gcp.health_check_firewall_rule.name).execute(
2187                 num_retries=_GCP_API_RETRIES)
2188         wait_for_global_operation(gcp, result['name'])
2189     except googleapiclient.errors.HttpError as http_error:
2190         logger.info('Delete failed: %s', http_error)
2191
2192
2193 def delete_health_check(gcp):
2194     try:
2195         result = gcp.compute.healthChecks().delete(
2196             project=gcp.project, healthCheck=gcp.health_check.name).execute(
2197                 num_retries=_GCP_API_RETRIES)
2198         wait_for_global_operation(gcp, result['name'])
2199     except googleapiclient.errors.HttpError as http_error:
2200         logger.info('Delete failed: %s', http_error)
2201
2202
2203 def delete_instance_groups(gcp):
2204     for instance_group in gcp.instance_groups:
2205         try:
2206             result = gcp.compute.instanceGroupManagers().delete(
2207                 project=gcp.project,
2208                 zone=instance_group.zone,
2209                 instanceGroupManager=instance_group.name).execute(
2210                     num_retries=_GCP_API_RETRIES)
2211             wait_for_zone_operation(gcp,
2212                                     instance_group.zone,
2213                                     result['name'],
2214                                     timeout_sec=_WAIT_FOR_BACKEND_SEC)
2215         except googleapiclient.errors.HttpError as http_error:
2216             logger.info('Delete failed: %s', http_error)
2217
2218
2219 def delete_instance_template(gcp):
2220     try:
2221         result = gcp.compute.instanceTemplates().delete(
2222             project=gcp.project,
2223             instanceTemplate=gcp.instance_template.name).execute(
2224                 num_retries=_GCP_API_RETRIES)
2225         wait_for_global_operation(gcp, result['name'])
2226     except googleapiclient.errors.HttpError as http_error:
2227         logger.info('Delete failed: %s', http_error)
2228
2229
2230 def patch_backend_service(gcp,
2231                           backend_service,
2232                           instance_groups,
2233                           balancing_mode='UTILIZATION',
2234                           max_rate=1,
2235                           circuit_breakers=None):
2236     if gcp.alpha_compute:
2237         compute_to_use = gcp.alpha_compute
2238     else:
2239         compute_to_use = gcp.compute
2240     config = {
2241         'backends': [{
2242             'group': instance_group.url,
2243             'balancingMode': balancing_mode,
2244             'maxRate': max_rate if balancing_mode == 'RATE' else None
2245         } for instance_group in instance_groups],
2246         'circuitBreakers': circuit_breakers,
2247     }
2248     logger.debug('Sending GCP request with body=%s', config)
2249     result = compute_to_use.backendServices().patch(
2250         project=gcp.project, backendService=backend_service.name,
2251         body=config).execute(num_retries=_GCP_API_RETRIES)
2252     wait_for_global_operation(gcp,
2253                               result['name'],
2254                               timeout_sec=_WAIT_FOR_BACKEND_SEC)
2255
2256
2257 def resize_instance_group(gcp,
2258                           instance_group,
2259                           new_size,
2260                           timeout_sec=_WAIT_FOR_OPERATION_SEC):
2261     result = gcp.compute.instanceGroupManagers().resize(
2262         project=gcp.project,
2263         zone=instance_group.zone,
2264         instanceGroupManager=instance_group.name,
2265         size=new_size).execute(num_retries=_GCP_API_RETRIES)
2266     wait_for_zone_operation(gcp,
2267                             instance_group.zone,
2268                             result['name'],
2269                             timeout_sec=360)
2270     wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2271                                                    new_size, timeout_sec)
2272
2273
2274 def patch_url_map_backend_service(gcp,
2275                                   backend_service=None,
2276                                   services_with_weights=None,
2277                                   route_rules=None):
2278     '''change url_map's backend service
2279
2280     Only one of backend_service and service_with_weights can be not None.
2281     '''
2282     if gcp.alpha_compute:
2283         compute_to_use = gcp.alpha_compute
2284     else:
2285         compute_to_use = gcp.compute
2286
2287     if backend_service and services_with_weights:
2288         raise ValueError(
2289             'both backend_service and service_with_weights are not None.')
2290
2291     default_service = backend_service.url if backend_service else None
2292     default_route_action = {
2293         'weightedBackendServices': [{
2294             'backendService': service.url,
2295             'weight': w,
2296         } for service, w in services_with_weights.items()]
2297     } if services_with_weights else None
2298
2299     config = {
2300         'pathMatchers': [{
2301             'name': _PATH_MATCHER_NAME,
2302             'defaultService': default_service,
2303             'defaultRouteAction': default_route_action,
2304             'routeRules': route_rules,
2305         }]
2306     }
2307     logger.debug('Sending GCP request with body=%s', config)
2308     result = compute_to_use.urlMaps().patch(
2309         project=gcp.project, urlMap=gcp.url_map.name,
2310         body=config).execute(num_retries=_GCP_API_RETRIES)
2311     wait_for_global_operation(gcp, result['name'])
2312
2313
2314 def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2315                                                    expected_size, timeout_sec):
2316     start_time = time.time()
2317     while True:
2318         current_size = len(get_instance_names(gcp, instance_group))
2319         if current_size == expected_size:
2320             break
2321         if time.time() - start_time > timeout_sec:
2322             raise Exception(
2323                 'Instance group had expected size %d but actual size %d' %
2324                 (expected_size, current_size))
2325         time.sleep(2)
2326
2327
2328 def wait_for_global_operation(gcp,
2329                               operation,
2330                               timeout_sec=_WAIT_FOR_OPERATION_SEC):
2331     start_time = time.time()
2332     while time.time() - start_time <= timeout_sec:
2333         result = gcp.compute.globalOperations().get(
2334             project=gcp.project,
2335             operation=operation).execute(num_retries=_GCP_API_RETRIES)
2336         if result['status'] == 'DONE':
2337             if 'error' in result:
2338                 raise Exception(result['error'])
2339             return
2340         time.sleep(2)
2341     raise Exception('Operation %s did not complete within %d' %
2342                     (operation, timeout_sec))
2343
2344
2345 def wait_for_zone_operation(gcp,
2346                             zone,
2347                             operation,
2348                             timeout_sec=_WAIT_FOR_OPERATION_SEC):
2349     start_time = time.time()
2350     while time.time() - start_time <= timeout_sec:
2351         result = gcp.compute.zoneOperations().get(
2352             project=gcp.project, zone=zone,
2353             operation=operation).execute(num_retries=_GCP_API_RETRIES)
2354         if result['status'] == 'DONE':
2355             if 'error' in result:
2356                 raise Exception(result['error'])
2357             return
2358         time.sleep(2)
2359     raise Exception('Operation %s did not complete within %d' %
2360                     (operation, timeout_sec))
2361
2362
2363 def wait_for_healthy_backends(gcp,
2364                               backend_service,
2365                               instance_group,
2366                               timeout_sec=_WAIT_FOR_BACKEND_SEC):
2367     start_time = time.time()
2368     config = {'group': instance_group.url}
2369     instance_names = get_instance_names(gcp, instance_group)
2370     expected_size = len(instance_names)
2371     while time.time() - start_time <= timeout_sec:
2372         for instance_name in instance_names:
2373             try:
2374                 status = get_serving_status(instance_name, gcp.service_port)
2375                 logger.info('serving status response from %s: %s',
2376                             instance_name, status)
2377             except grpc.RpcError as rpc_error:
2378                 logger.info('checking serving status of %s failed: %s',
2379                             instance_name, rpc_error)
2380         result = gcp.compute.backendServices().getHealth(
2381             project=gcp.project,
2382             backendService=backend_service.name,
2383             body=config).execute(num_retries=_GCP_API_RETRIES)
2384         if 'healthStatus' in result:
2385             logger.info('received GCP healthStatus: %s', result['healthStatus'])
2386             healthy = True
2387             for instance in result['healthStatus']:
2388                 if instance['healthState'] != 'HEALTHY':
2389                     healthy = False
2390                     break
2391             if healthy and expected_size == len(result['healthStatus']):
2392                 return
2393         else:
2394             logger.info('no healthStatus received from GCP')
2395         time.sleep(5)
2396     raise Exception('Not all backends became healthy within %d seconds: %s' %
2397                     (timeout_sec, result))
2398
2399
2400 def get_instance_names(gcp, instance_group):
2401     instance_names = []
2402     result = gcp.compute.instanceGroups().listInstances(
2403         project=gcp.project,
2404         zone=instance_group.zone,
2405         instanceGroup=instance_group.name,
2406         body={
2407             'instanceState': 'ALL'
2408         }).execute(num_retries=_GCP_API_RETRIES)
2409     if 'items' not in result:
2410         return []
2411     for item in result['items']:
2412         # listInstances() returns the full URL of the instance, which ends with
2413         # the instance name. compute.instances().get() requires using the
2414         # instance name (not the full URL) to look up instance details, so we
2415         # just extract the name manually.
2416         instance_name = item['instance'].split('/')[-1]
2417         instance_names.append(instance_name)
2418     logger.info('retrieved instance names: %s', instance_names)
2419     return instance_names
2420
2421
2422 def clean_up(gcp):
2423     if gcp.global_forwarding_rule:
2424         delete_global_forwarding_rule(gcp)
2425     if gcp.target_proxy:
2426         delete_target_proxy(gcp)
2427     if gcp.url_map:
2428         delete_url_map(gcp)
2429     delete_backend_services(gcp)
2430     if gcp.health_check_firewall_rule:
2431         delete_firewall(gcp)
2432     if gcp.health_check:
2433         delete_health_check(gcp)
2434     delete_instance_groups(gcp)
2435     if gcp.instance_template:
2436         delete_instance_template(gcp)
2437
2438
2439 class InstanceGroup(object):
2440
2441     def __init__(self, name, url, zone):
2442         self.name = name
2443         self.url = url
2444         self.zone = zone
2445
2446
2447 class GcpResource(object):
2448
2449     def __init__(self, name, url):
2450         self.name = name
2451         self.url = url
2452
2453
2454 class GcpState(object):
2455
2456     def __init__(self, compute, alpha_compute, project, project_num):
2457         self.compute = compute
2458         self.alpha_compute = alpha_compute
2459         self.project = project
2460         self.project_num = project_num
2461         self.health_check = None
2462         self.health_check_firewall_rule = None
2463         self.backend_services = []
2464         self.url_map = None
2465         self.target_proxy = None
2466         self.global_forwarding_rule = None
2467         self.service_port = None
2468         self.instance_template = None
2469         self.instance_groups = []
2470
2471
2472 alpha_compute = None
2473 if args.compute_discovery_document:
2474     with open(args.compute_discovery_document, 'r') as discovery_doc:
2475         compute = googleapiclient.discovery.build_from_document(
2476             discovery_doc.read())
2477     if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
2478         with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
2479             alpha_compute = googleapiclient.discovery.build_from_document(
2480                 discovery_doc.read())
2481 else:
2482     compute = googleapiclient.discovery.build('compute', 'v1')
2483     if not args.only_stable_gcp_apis:
2484         alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
2485
2486 try:
2487     gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num)
2488     gcp_suffix = args.gcp_suffix
2489     health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
2490     if not args.use_existing_gcp_resources:
2491         if args.keep_gcp_resources:
2492             # Auto-generating a unique suffix in case of conflict should not be
2493             # combined with --keep_gcp_resources, as the suffix actually used
2494             # for GCP resources will not match the provided --gcp_suffix value.
2495             num_attempts = 1
2496         else:
2497             num_attempts = 5
2498         for i in range(num_attempts):
2499             try:
2500                 logger.info('Using GCP suffix %s', gcp_suffix)
2501                 create_health_check(gcp, health_check_name)
2502                 break
2503             except googleapiclient.errors.HttpError as http_error:
2504                 gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999))
2505                 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
2506                 logger.exception('HttpError when creating health check')
2507         if gcp.health_check is None:
2508             raise Exception('Failed to create health check name after %d '
2509                             'attempts' % num_attempts)
2510     firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
2511     backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
2512     alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix
2513     url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
2514     service_host_name = _BASE_SERVICE_HOST + gcp_suffix
2515     target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
2516     forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
2517     template_name = _BASE_TEMPLATE_NAME + gcp_suffix
2518     instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
2519     same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix
2520     secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix
2521     if args.use_existing_gcp_resources:
2522         logger.info('Reusing existing GCP resources')
2523         get_health_check(gcp, health_check_name)
2524         try:
2525             get_health_check_firewall_rule(gcp, firewall_name)
2526         except googleapiclient.errors.HttpError as http_error:
2527             # Firewall rule may be auto-deleted periodically depending on GCP
2528             # project settings.
2529             logger.exception('Failed to find firewall rule, recreating')
2530             create_health_check_firewall_rule(gcp, firewall_name)
2531         backend_service = get_backend_service(gcp, backend_service_name)
2532         alternate_backend_service = get_backend_service(
2533             gcp, alternate_backend_service_name)
2534         get_url_map(gcp, url_map_name)
2535         get_target_proxy(gcp, target_proxy_name)
2536         get_global_forwarding_rule(gcp, forwarding_rule_name)
2537         get_instance_template(gcp, template_name)
2538         instance_group = get_instance_group(gcp, args.zone, instance_group_name)
2539         same_zone_instance_group = get_instance_group(
2540             gcp, args.zone, same_zone_instance_group_name)
2541         secondary_zone_instance_group = get_instance_group(
2542             gcp, args.secondary_zone, secondary_zone_instance_group_name)
2543     else:
2544         create_health_check_firewall_rule(gcp, firewall_name)
2545         backend_service = add_backend_service(gcp, backend_service_name)
2546         alternate_backend_service = add_backend_service(
2547             gcp, alternate_backend_service_name)
2548         create_url_map(gcp, url_map_name, backend_service, service_host_name)
2549         create_target_proxy(gcp, target_proxy_name)
2550         potential_service_ports = list(args.service_port_range)
2551         random.shuffle(potential_service_ports)
2552         create_global_forwarding_rule(gcp, forwarding_rule_name,
2553                                       potential_service_ports)
2554         if not gcp.service_port:
2555             raise Exception(
2556                 'Failed to find a valid ip:port for the forwarding rule')
2557         if gcp.service_port != _DEFAULT_SERVICE_PORT:
2558             patch_url_map_host_rule_with_port(gcp, url_map_name,
2559                                               backend_service,
2560                                               service_host_name)
2561         startup_script = get_startup_script(args.path_to_server_binary,
2562                                             gcp.service_port)
2563         create_instance_template(gcp, template_name, args.network,
2564                                  args.source_image, args.machine_type,
2565                                  startup_script)
2566         instance_group = add_instance_group(gcp, args.zone, instance_group_name,
2567                                             _INSTANCE_GROUP_SIZE)
2568         patch_backend_service(gcp, backend_service, [instance_group])
2569         same_zone_instance_group = add_instance_group(
2570             gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
2571         secondary_zone_instance_group = add_instance_group(
2572             gcp, args.secondary_zone, secondary_zone_instance_group_name,
2573             _INSTANCE_GROUP_SIZE)
2574
2575     wait_for_healthy_backends(gcp, backend_service, instance_group)
2576
2577     if args.test_case:
2578         client_env = dict(os.environ)
2579         bootstrap_server_features = []
2580         if gcp.service_port == _DEFAULT_SERVICE_PORT:
2581             server_uri = service_host_name
2582         else:
2583             server_uri = service_host_name + ':' + str(gcp.service_port)
2584         if args.xds_v3_support:
2585             client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true'
2586             bootstrap_server_features.append('xds_v3')
2587         if args.bootstrap_file:
2588             bootstrap_path = os.path.abspath(args.bootstrap_file)
2589         else:
2590             with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
2591                 bootstrap_file.write(
2592                     _BOOTSTRAP_TEMPLATE.format(
2593                         node_id='projects/%s/networks/%s/nodes/%s' %
2594                         (gcp.project_num, args.network.split('/')[-1],
2595                          uuid.uuid1()),
2596                         server_features=json.dumps(
2597                             bootstrap_server_features)).encode('utf-8'))
2598                 bootstrap_path = bootstrap_file.name
2599         client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
2600         client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
2601         client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true'
2602         client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true'
2603         test_results = {}
2604         failed_tests = []
2605         for test_case in args.test_case:
2606             if test_case in _V3_TEST_CASES and not args.xds_v3_support:
2607                 logger.info('skipping test %s due to missing v3 support',
2608                             test_case)
2609                 continue
2610             if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute:
2611                 logger.info('skipping test %s due to missing alpha support',
2612                             test_case)
2613                 continue
2614             result = jobset.JobResult()
2615             log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
2616             if not os.path.exists(log_dir):
2617                 os.makedirs(log_dir)
2618             test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
2619             test_log_file = open(test_log_filename, 'w+')
2620             client_process = None
2621
2622             if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
2623                 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
2624             else:
2625                 rpcs_to_send = '--rpc="UnaryCall"'
2626
2627             if test_case in _TESTS_TO_SEND_METADATA:
2628                 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format(
2629                     keyE=_TEST_METADATA_KEY,
2630                     valueE=_TEST_METADATA_VALUE_EMPTY,
2631                     keyU=_TEST_METADATA_KEY,
2632                     valueU=_TEST_METADATA_VALUE_UNARY,
2633                     keyNU=_TEST_METADATA_NUMERIC_KEY,
2634                     valueNU=_TEST_METADATA_NUMERIC_VALUE)
2635             else:
2636                 # Setting the arg explicitly to empty with '--metadata=""'
2637                 # makes C# client fail
2638                 # (see https://github.com/commandlineparser/commandline/issues/412),
2639                 # so instead we just rely on clients using the default when
2640                 # metadata arg is not specified.
2641                 metadata_to_send = ''
2642
2643             # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
2644             # in the client. This means we will ignore intermittent RPC
2645             # failures (but this framework still checks that the final result
2646             # is as expected).
2647             #
2648             # Reason for disabling this is, the resources are shared by
2649             # multiple tests, and a change in previous test could be delayed
2650             # until the second test starts. The second test may see
2651             # intermittent failures because of that.
2652             #
2653             # A fix is to not share resources between tests (though that does
2654             # mean the tests will be significantly slower due to creating new
2655             # resources).
2656             fail_on_failed_rpc = ''
2657
2658             try:
2659                 if not CLIENT_HOSTS:
2660                     client_cmd_formatted = args.client_cmd.format(
2661                         server_uri=server_uri,
2662                         stats_port=args.stats_port,
2663                         qps=args.qps,
2664                         fail_on_failed_rpc=fail_on_failed_rpc,
2665                         rpcs_to_send=rpcs_to_send,
2666                         metadata_to_send=metadata_to_send)
2667                     logger.debug('running client: %s', client_cmd_formatted)
2668                     client_cmd = shlex.split(client_cmd_formatted)
2669                     client_process = subprocess.Popen(client_cmd,
2670                                                       env=client_env,
2671                                                       stderr=subprocess.STDOUT,
2672                                                       stdout=test_log_file)
2673                 if test_case == 'backends_restart':
2674                     test_backends_restart(gcp, backend_service, instance_group)
2675                 elif test_case == 'change_backend_service':
2676                     test_change_backend_service(gcp, backend_service,
2677                                                 instance_group,
2678                                                 alternate_backend_service,
2679                                                 same_zone_instance_group)
2680                 elif test_case == 'gentle_failover':
2681                     test_gentle_failover(gcp, backend_service, instance_group,
2682                                          secondary_zone_instance_group)
2683                 elif test_case == 'load_report_based_failover':
2684                     test_load_report_based_failover(
2685                         gcp, backend_service, instance_group,
2686                         secondary_zone_instance_group)
2687                 elif test_case == 'ping_pong':
2688                     test_ping_pong(gcp, backend_service, instance_group)
2689                 elif test_case == 'remove_instance_group':
2690                     test_remove_instance_group(gcp, backend_service,
2691                                                instance_group,
2692                                                same_zone_instance_group)
2693                 elif test_case == 'round_robin':
2694                     test_round_robin(gcp, backend_service, instance_group)
2695                 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
2696                     test_secondary_locality_gets_no_requests_on_partial_primary_failure(
2697                         gcp, backend_service, instance_group,
2698                         secondary_zone_instance_group)
2699                 elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
2700                     test_secondary_locality_gets_requests_on_primary_failure(
2701                         gcp, backend_service, instance_group,
2702                         secondary_zone_instance_group)
2703                 elif test_case == 'traffic_splitting':
2704                     test_traffic_splitting(gcp, backend_service, instance_group,
2705                                            alternate_backend_service,
2706                                            same_zone_instance_group)
2707                 elif test_case == 'path_matching':
2708                     test_path_matching(gcp, backend_service, instance_group,
2709                                        alternate_backend_service,
2710                                        same_zone_instance_group)
2711                 elif test_case == 'header_matching':
2712                     test_header_matching(gcp, backend_service, instance_group,
2713                                          alternate_backend_service,
2714                                          same_zone_instance_group)
2715                 elif test_case == 'circuit_breaking':
2716                     test_circuit_breaking(gcp, backend_service, instance_group,
2717                                           same_zone_instance_group)
2718                 elif test_case == 'timeout':
2719                     test_timeout(gcp, backend_service, instance_group)
2720                 elif test_case == 'fault_injection':
2721                     test_fault_injection(gcp, backend_service, instance_group)
2722                 else:
2723                     logger.error('Unknown test case: %s', test_case)
2724                     sys.exit(1)
2725                 if client_process and client_process.poll() is not None:
2726                     raise Exception(
2727                         'Client process exited prematurely with exit code %d' %
2728                         client_process.returncode)
2729                 result.state = 'PASSED'
2730                 result.returncode = 0
2731             except Exception as e:
2732                 logger.exception('Test case %s failed', test_case)
2733                 failed_tests.append(test_case)
2734                 result.state = 'FAILED'
2735                 result.message = str(e)
2736             finally:
2737                 if client_process:
2738                     if client_process.returncode:
2739                         logger.info('Client exited with code %d' %
2740                                     client_process.returncode)
2741                     else:
2742                         client_process.terminate()
2743                 test_log_file.close()
2744                 # Workaround for Python 3, as report_utils will invoke decode() on
2745                 # result.message, which has a default value of ''.
2746                 result.message = result.message.encode('UTF-8')
2747                 test_results[test_case] = [result]
2748                 if args.log_client_output:
2749                     logger.info('Client output:')
2750                     with open(test_log_filename, 'r') as client_output:
2751                         logger.info(client_output.read())
2752         if not os.path.exists(_TEST_LOG_BASE_DIR):
2753             os.makedirs(_TEST_LOG_BASE_DIR)
2754         report_utils.render_junit_xml_report(test_results,
2755                                              os.path.join(
2756                                                  _TEST_LOG_BASE_DIR,
2757                                                  _SPONGE_XML_NAME),
2758                                              suite_name='xds_tests',
2759                                              multi_target=True)
2760         if failed_tests:
2761             logger.error('Test case(s) %s failed', failed_tests)
2762             sys.exit(1)
2763 finally:
2764     if not args.keep_gcp_resources:
2765         logger.info('Cleaning up GCP resources. This may take some time.')
2766         clean_up(gcp)