Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / php / tests / interop / xds_client.php
1 <?php
2 /*
3  *
4  * Copyright 2020 gRPC authors.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  */
19
20 /**
21  * This is the PHP xDS Interop test client. This script is meant to be run by
22  * the main xDS Interep test runner "run_xds_tests.py", not to be run
23  * by itself standalone.
24  */
25 $autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php');
26 require_once $autoload_path;
27
28 class XdsUpdateClientConfigureService
29         extends \Grpc\Testing\XdsUpdateClientConfigureServiceStub
30 {
31     function configure(
32         \Grpc\Testing\ClientConfigureRequest $request,
33         \Grpc\ServerContext $context
34     ): ?\Grpc\Testing\ClientConfigureResponse {
35         $rpc_types = $request->getTypes();
36         $all_metadata = $request->getMetadata();
37         $rpcs_to_send = [];
38         foreach ($rpc_types as $rpc_type) {
39             if ($rpc_type ==
40                 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
41                 $rpcs_to_send[] = 'EmptyCall';
42             } else if ($rpc_type ==
43                        \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
44                 $rpcs_to_send[] = 'UnaryCall';
45             }
46         }
47         $metadata_to_send = [];
48         foreach ($all_metadata as $metadata) {
49             $rpc_type = $metadata->getType();
50             if ($rpc_type ==
51                 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
52                 $rpc_type_key = 'EmptyCall';
53             } else if ($rpc_type ==
54                        \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
55                 $rpc_type_key = 'UnaryCall';
56             }
57             $key = $metadata->getKey();
58             $value = $metadata->getValue();
59             if (!isset($metadata_to_send[$rpc_type_key])) {
60                 $metadata_to_send[$rpc_type_key] = [];
61             }
62             $metadata_to_send[$rpc_type_key][$key] = $value;
63         }
64         global $client_thread;
65         echo "PHP parent: Setting client_thread rpc_config to \n";
66         print_r($rpcs_to_send);
67         print_r($metadata_to_send);
68         echo "PHP parent: timeout_sec = ".$request->getTimeoutSec()."\n";
69         $client_thread->rpc_config->update($rpcs_to_send,
70                                            $metadata_to_send,
71                                            $request->getTimeoutSec());
72         return new Grpc\Testing\ClientConfigureResponse();
73     }
74 }
75
76 // The main xds interop test runner will ping this service to ask for
77 // the stats of the distribution of the backends, for the next X rpcs.
78 class LoadBalancerStatsService
79     extends \Grpc\Testing\LoadBalancerStatsServiceStub
80 {
81     function getClientStats(
82         \Grpc\Testing\LoadBalancerStatsRequest $request,
83         \Grpc\ServerContext $context
84     ): ?\Grpc\Testing\LoadBalancerStatsResponse {
85         $num_rpcs = $request->getNumRpcs();
86         $timeout_sec = $request->getTimeoutSec();
87         $rpcs_by_method = [];
88         $rpcs_by_peer = [];
89         $num_failures = 0;
90
91         // Heavy limitation now: the server is blocking, until all
92         // the necessary num_rpcs are finished, or timeout is reached
93         global $client_thread;
94         $start_id = $client_thread->num_results + 1;
95         $end_id = $start_id + $num_rpcs;
96         $now = hrtime(true);
97         $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
98         while (true) {
99             $curr_hr = hrtime(true);
100             $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9);
101             if ($curr_time > $timeout) {
102                 break;
103             }
104             // Thread variable seems to be read-only
105             $curr_id = $client_thread->num_results;
106             if ($curr_id >= $end_id) {
107                 break;
108             }
109             usleep(50000);
110         }
111
112         // Tally up results
113         $end_id = min($end_id, $client_thread->num_results);
114         // "$client_thread->results" will be in the form of
115         // [
116         //   'rpc1' => [
117         //     'hostname1', '', 'hostname2', 'hostname1', '', ...
118         //   ],
119         //   'rpc2' => [
120         //     '', 'hostname1', 'hostname2', '', 'hostname2', ...
121         //   ],
122         // ]
123         foreach ((array)$client_thread->rpc_config->rpcs_to_send as $rpc) {
124             $results = $client_thread->results[$rpc];
125             // initialize, can always start from scratch here
126             $rpcs_by_method[$rpc] = [];
127             for ($i = $start_id; $i < $end_id; $i++) {
128                 $hostname = $results[$i];
129                 if ($hostname) {
130                     // initialize in case we haven't seen this hostname
131                     // before
132                     if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) {
133                         $rpcs_by_method[$rpc][$hostname] = 0;
134                     }
135                     if (!array_key_exists($hostname, $rpcs_by_peer)) {
136                         $rpcs_by_peer[$hostname] = 0;
137                     }
138                     // increment the remote hostname distribution histogram
139                     // both by overall, and broken down per RPC
140                     $rpcs_by_method[$rpc][$hostname] += 1;
141                     $rpcs_by_peer[$hostname] += 1;
142                 } else {
143                     // $num_failures here are counted per individual RPC
144                     $num_failures += 1;
145                 }
146             }
147         }
148
149         // Convert our hashmaps above into protobuf objects
150         $response = new Grpc\Testing\LoadBalancerStatsResponse();
151         $rpcs_by_method_map = [];
152         foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) {
153             $rpcs_by_peer_proto_obj
154                 = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer();
155             $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method);
156             $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj;
157         }
158         $response->setRpcsByPeer($rpcs_by_peer);
159         $response->setRpcsByMethod($rpcs_by_method_map);
160         $response->setNumFailures($num_failures);
161         return $response;
162     }
163
164     function GetClientAccumulatedStats(
165         \Grpc\Testing\LoadBalancerAccumulatedStatsRequest $request,
166         \Grpc\ServerContext $context
167     ): ?\Grpc\Testing\LoadBalancerAccumulatedStatsResponse {
168         global $client_thread;
169         $response = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse();
170         $response->setNumRpcsStartedByMethod(
171             (array)$client_thread->num_rpcs_started_by_method);
172         $response->setNumRpcsSucceededByMethod(
173             (array)$client_thread->num_rpcs_succeeded_by_method);
174         $response->setNumRpcsFailedByMethod(
175             (array)$client_thread->num_rpcs_failed_by_method);
176         $accumulated_method_stats
177             = (array)$client_thread->accumulated_method_stats;
178         $stats_per_method = [];
179         foreach ($accumulated_method_stats as $rpc_name => $stats) {
180             $methodStats
181                 = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse\MethodStats();
182             $methodStats->setRpcsStarted($stats['rpcs_started']);
183             $methodStats->setResult((array)$stats['result']);
184             $stats_per_method[$rpc_name] = $methodStats;
185         }
186         $response->setStatsPerMethod($stats_per_method);
187         return $response;
188     }
189 }
190
191 class RpcConfig extends Volatile {
192     public $server_address;
193     public $qps;
194     public $fail_on_failed_rpcs;
195     public $rpcs_to_send;
196     public $metadata_to_send;
197     public $tmp_file1;
198     public $tmp_file2;
199     public $timeout_sec;
200     public function __construct($server_address,
201                                 $qps,
202                                 $fail_on_failed_rpcs,
203                                 $rpcs_to_send,
204                                 $metadata_to_send,
205                                 $tmp_file1,
206                                 $tmp_file2) {
207         $this->server_address = $server_address;
208         $this->qps = $qps;
209         $this->fail_on_failed_rpcs = $fail_on_failed_rpcs;
210         $this->rpcs_to_send = (array)$rpcs_to_send;
211         $this->metadata_to_send = (array)$metadata_to_send;
212         $this->tmp_file1 = $tmp_file1;
213         $this->tmp_file2 = $tmp_file2;
214         $this->timeout_sec = 30;
215     }
216     public function update($rpcs_to_send, $metadata_to_send, $timeout_sec) {
217         $this->rpcs_to_send = (array)$rpcs_to_send;
218         $this->metadata_to_send = (array)$metadata_to_send;
219         $this->timeout_sec = $timeout_sec;
220     }
221 }
222
223 // This client thread blindly sends a unary RPC to the server once
224 // every 1 / qps seconds.
225 class ClientThread extends Thread {
226     private $target_seconds_between_rpcs_;
227     private $autoload_path_;
228     private $TIMEOUT_US = 30 * 1e6; // 30 seconds
229     public $rpc_config;
230     public $num_results = 0;
231     public $results;
232
233     public $RPC_MAP = [
234         'UnaryCall' => 'UNARY_CALL',
235         'EmptyCall' => 'EMPTY_CALL',
236     ];
237
238     public $num_rpcs_started_by_method = [];
239     public $num_rpcs_succeeded_by_method = [];
240     public $num_rpcs_failed_by_method = [];
241     public $accumulated_method_stats = [];
242
243     public function __construct($rpc_config,
244                                 $autoload_path) {
245         $this->rpc_config = $rpc_config;
246         $this->target_seconds_between_rpcs_ = 1.0 / $rpc_config->qps;
247         $this->autoload_path_ = $autoload_path;
248         $this->simple_request = new Grpc\Testing\SimpleRequest();
249         $this->empty_request = new Grpc\Testing\EmptyMessage();
250         $this->results = [];
251         foreach (['UnaryCall', 'EmptyCall'] as $rpc) {
252             $this->results[$rpc] = [];
253         }
254         $this->outstanding_rpcs = [];
255         foreach (['UNARY_CALL', 'EMPTY_CALL'] as $rpc_stats_key) {
256             $this->num_rpcs_started_by_method[$rpc_stats_key] = 0;
257             $this->num_rpcs_succeeded_by_method[$rpc_stats_key] = 0;
258             $this->num_rpcs_failed_by_method[$rpc_stats_key] = 0;
259             $this->accumulated_method_stats[$rpc_stats_key] = [
260                 'rpcs_started' => 0,
261                 'result' => [],
262             ];
263         }
264     }
265
266     public function sendUnaryCall($stub, $metadata) {
267         $timeout = $this->rpc_config->timeout_sec ?
268                  $this->rpc_config->timeout_sec * 1e6 :
269                  $this->TIMEOUT_US;
270         return $stub->UnaryCall($this->simple_request,
271                                 $metadata,
272                                 ['timeout' => $timeout]);
273     }
274
275     public function sendEmptyCall($stub, $metadata) {
276         $timeout = $this->rpc_config->timeout_sec ?
277                  $this->rpc_config->timeout_sec * 1e6 :
278                  $this->TIMEOUT_US;
279         return $stub->EmptyCall($this->empty_request,
280                                 $metadata,
281                                 ['timeout' => $timeout]);
282     }
283
284     public function add_rpc_result($rpc, $status_code) {
285         // $rpc here needs to be in the format of 'UnaryCall', 'EmptyCall'
286         if (!isset($this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
287                    ['result'][$status_code])) {
288             $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
289                 ['result'][$status_code] = 0;
290         }
291         $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
292             ['result'][$status_code] += 1;
293     }
294
295     public function check_child_process_result() {
296         if (sizeof($this->outstanding_rpcs) > 0 &&
297             $this->rpc_config->tmp_file2) {
298             $keys_to_delete = [];
299             // tmp_file2 contains the RPC result of each RPC we
300             // originally wrote to tmp_file1
301             $f2 = fopen($this->rpc_config->tmp_file2, 'r+');
302             flock($f2, LOCK_EX);
303             while (true) {
304                 $f2_line = fgets($f2);
305                 if (!$f2_line) {
306                     break;
307                 }
308                 // format here needs to be in sync with
309                 // src/php/bin/xds_manager.py
310                 $parts = explode(',', trim($f2_line));
311                 $key = $parts[0];
312                 $returncode = $parts[1];
313                 if (isset($this->outstanding_rpcs[$key])) {
314                     $parts2 = explode('|', $key);
315                     $result_num = $parts2[0];
316                     $rpc_name = $parts2[1];
317                     // Child processes can only communicate back the
318                     // status code for now.
319                     // Current interop test specs only call for
320                     // reporting back the status code in these scenarios.
321                     // If we ever need the hostname reported back from
322                     // child processes, we need to enhance this
323                     // communication framework through tmp files.
324                     $this->results[$rpc_name][$result_num] = "";
325                     if ($returncode) {
326                         $this->num_rpcs_failed_by_method
327                             [$this->RPC_MAP[$rpc_name]] += 1;
328                     } else {
329                         $this->num_rpcs_succeeded_by_method
330                             [$this->RPC_MAP[$rpc_name]] += 1;
331                     }
332                     $this->add_rpc_result($rpc_name, $returncode);
333                     $keys_to_delete[] = $key;
334                 }
335             }
336             foreach ($keys_to_delete as $key) {
337                 unset($this->outstanding_rpcs[$key]);
338             }
339             ftruncate($f2, 0);
340             flock($f2, LOCK_UN);
341             fclose($f2);
342         }
343     }
344
345     public function execute_rpc_in_child_process($rpc, $metadata_serialized) {
346         // tmp_file1 contains the list of RPCs (and their
347         // specs) we want executed. This will be picked up
348         // by src/php/bin/xds_manager.py
349         $f1 = fopen($this->rpc_config->tmp_file1, 'a');
350         $key = implode('|', [$this->num_results,
351                              $rpc,
352                              $metadata_serialized,
353                              $this->rpc_config->timeout_sec]);
354         flock($f1, LOCK_EX);
355         fwrite($f1, $key."\n");
356         fflush($f1);
357         flock($f1, LOCK_UN);
358         fclose($f1);
359         $this->outstanding_rpcs[$key] = 1;
360         $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
361         $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
362             ['rpcs_started'] += 1;
363     }
364
365     public function run() {
366         // Autoloaded classes do not get inherited in threads.
367         // Hence we need to do this.
368         require_once($this->autoload_path_);
369
370         $stub = new Grpc\Testing\TestServiceClient(
371             $this->rpc_config->server_address,
372             ['credentials' => Grpc\ChannelCredentials::createInsecure()
373         ]);
374         // hrtime returns nanoseconds
375         $target_next_start_us = hrtime(true) / 1000;
376         while (true) {
377             $now_us = hrtime(true) / 1000;
378             $sleep_us = $target_next_start_us - $now_us;
379             if ($sleep_us < 0) {
380                 $target_next_start_us =
381                         $now_us + ($this->target_seconds_between_rpcs_ * 1e6);
382             } else {
383                 $target_next_start_us +=
384                         ($this->target_seconds_between_rpcs_ * 1e6);
385                 usleep($sleep_us);
386             }
387             $this->check_child_process_result();
388             foreach ($this->rpc_config->rpcs_to_send as $rpc) {
389                 $metadata_to_send_arr
390                     = (array)$this->rpc_config->metadata_to_send;
391                 $metadata = array_key_exists($rpc, $metadata_to_send_arr) ?
392                           $metadata_to_send_arr[$rpc] : [];
393                 // This copy is somehow necessary because
394                 // $this->metadata_to_send[$rpc] somehow becomes a
395                 // Volatile object, instead of an associative array.
396                 $metadata_array = [];
397                 $execute_in_child_process = false;
398                 foreach ($metadata as $key => $value) {
399                     $metadata_array[$key] = [$value];
400                     if ($key == 'rpc-behavior' || $key == 'fi_testcase') {
401                         $execute_in_child_process = true;
402                     }
403                 }
404                 if ($execute_in_child_process && $this->rpc_config->tmp_file1) {
405                     // if 'rpc-behavior' is set, we need to pawn off
406                     // the execution to some other child PHP processes
407                     $this->execute_rpc_in_child_process(
408                         $rpc, serialize($metadata_array));
409                     continue;
410                 }
411                 // Execute RPC within this script
412                 $call = null;
413                 if ($rpc == 'UnaryCall') {
414                     $call = $this->sendUnaryCall($stub, $metadata_array);
415                 } else if ($rpc == 'EmptyCall') {
416                     $call = $this->sendEmptyCall($stub, $metadata_array);
417                 } else {
418                     throw new Exception("Unhandled rpc $rpc");
419                 }
420                 $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
421                 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
422                     ['rpcs_started'] += 1;
423                 // the remote peer is being returned as part of the
424                 // initial metadata, according to the test spec
425                 $initial_metadata = $call->getMetadata();
426                 list($response, $status) = $call->wait();
427                 if ($status->code == Grpc\STATUS_OK &&
428                     array_key_exists('hostname', $initial_metadata)) {
429                     $this->results[$rpc][$this->num_results]
430                         = $initial_metadata['hostname'][0];
431                     $this->num_rpcs_succeeded_by_method
432                         [$this->RPC_MAP[$rpc]] += 1;
433                     $this->add_rpc_result($rpc, 0);
434                 } else {
435                     if ($this->rpc_config->fail_on_failed_rpcs_) {
436                         throw new Exception("$rpc failed with status "
437                                             . $status->code);
438                     }
439                     $this->results[$rpc][$this->num_results] = "";
440                     $this->num_rpcs_failed_by_method
441                         [$this->RPC_MAP[$rpc]] += 1;
442                     $this->add_rpc_result($rpc, $status->code);
443                 }
444             }
445             // $num_results here is only incremented when the group of
446             // all $rpcs_to_send are done.
447             $this->num_results++;
448         }
449     }
450
451     // This is needed for loading autoload_path in the child thread
452     public function start(int $options = PTHREADS_INHERIT_ALL) {
453         return parent::start(PTHREADS_INHERIT_NONE);
454     }
455 }
456
457
458 // Note: num_channels are currently ignored for now
459 $args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
460                     'rpc:', 'metadata:', 'tmp_file1:', 'tmp_file2:',
461                     'server:', 'stats_port:', 'qps:']);
462
463 // Convert input in the form of
464 //   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
465 // into
466 //   [
467 //     'rpc1' => [
468 //       'k1' => 'v1',
469 //       'k3' => 'v3',
470 //     ],
471 //     'rpc2' => [
472 //       'k2' => 'v2'
473 //     ],
474 //   ]
475 $metadata_to_send = [];
476 if ($_all_metadata = explode(',', $args['metadata'])) {
477     foreach ($_all_metadata as $one_metadata_pair) {
478         list($rpc,
479              $metadata_key,
480              $metadata_value) = explode(':', $one_metadata_pair);
481         // initialize in case we haven't seen this rpc before
482         if (!array_key_exists($rpc, $metadata_to_send)) {
483             $metadata_to_send[$rpc] = [];
484         }
485         $metadata_to_send[$rpc][$metadata_key] = $metadata_value;
486     }
487 }
488 $rpcs_to_send = (empty($args['rpc']) ? 'UnaryCall' : $args['rpc']);
489
490 // Need to communicate the xds server name to the async runner manager
491 if ($args['tmp_file1']) {
492     $f1 = fopen($args['tmp_file1'], 'w');
493     fwrite($f1, 'server_address,'.$args['server']);
494     fclose($f1);
495 }
496
497 $rpc_config = new RpcConfig($args['server'],
498                             $args['qps'],
499                             $args['fail_on_failed_rpcs'],
500                             explode(',', $rpcs_to_send),
501                             $metadata_to_send,
502                             $args['tmp_file1'],
503                             $args['tmp_file2']);
504
505
506 $client_thread = new ClientThread($rpc_config,
507                                   $autoload_path);
508 $client_thread->start();
509
510 $server = new Grpc\RpcServer();
511 $server->addHttp2Port('0.0.0.0:'.$args['stats_port']);
512 $server->handle(new LoadBalancerStatsService());
513 $server->handle(new XdsUpdateClientConfigureService());
514 $server->run();