Imported Upstream version 1.37.1
[platform/upstream/grpc.git] / src / php / tests / interop / xds_client.php
1 <?php
2 /*
3  *
4  * Copyright 2020 gRPC authors.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  */
19
20 /**
21  * This is the PHP xDS Interop test client. This script is meant to be run by
22  * the main xDS Interep test runner "run_xds_tests.py", not to be run
23  * by itself standalone.
24  */
25 $autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php');
26 require_once $autoload_path;
27
28 class XdsUpdateClientConfigureService
29         extends \Grpc\Testing\XdsUpdateClientConfigureServiceStub
30 {
31     function configure(\Grpc\Testing\ClientConfigureRequest $request) {
32         $rpc_types = $request->getTypes();
33         $all_metadata = $request->getMetadata();
34         $rpcs_to_send = [];
35         foreach ($rpc_types as $rpc_type) {
36             if ($rpc_type ==
37                 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
38                 $rpcs_to_send[] = 'EmptyCall';
39             } else if ($rpc_type ==
40                        \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
41                 $rpcs_to_send[] = 'UnaryCall';
42             }
43         }
44         $metadata_to_send = [];
45         foreach ($all_metadata as $metadata) {
46             $rpc_type = $metadata->getType();
47             if ($rpc_type ==
48                 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
49                 $rpc_type_key = 'EmptyCall';
50             } else if ($rpc_type ==
51                        \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
52                 $rpc_type_key = 'UnaryCall';
53             }
54             $key = $metadata->getKey();
55             $value = $metadata->getValue();
56             if (!isset($metadata_to_send[$rpc_type_key])) {
57                 $metadata_to_send[$rpc_type_key] = [];
58             }
59             $metadata_to_send[$rpc_type_key][$key] = $value;
60         }
61         global $client_thread;
62         echo "PHP parent: Setting client_thread rpc_config to \n";
63         print_r($rpcs_to_send);
64         print_r($metadata_to_send);
65         echo "PHP parent: timeout_sec = ".$request->getTimeoutSec()."\n";
66         $client_thread->rpc_config->update($rpcs_to_send,
67                                            $metadata_to_send,
68                                            $request->getTimeoutSec());
69         return new Grpc\Testing\ClientConfigureResponse();
70     }
71 }
72
73 // The main xds interop test runner will ping this service to ask for
74 // the stats of the distribution of the backends, for the next X rpcs.
75 class LoadBalancerStatsService
76     extends \Grpc\Testing\LoadBalancerStatsServiceStub
77 {
78     function getClientStats(\Grpc\Testing\LoadBalancerStatsRequest $request) {
79         $num_rpcs = $request->getNumRpcs();
80         $timeout_sec = $request->getTimeoutSec();
81         $rpcs_by_method = [];
82         $rpcs_by_peer = [];
83         $num_failures = 0;
84
85         // Heavy limitation now: the server is blocking, until all
86         // the necessary num_rpcs are finished, or timeout is reached
87         global $client_thread;
88         $start_id = $client_thread->num_results + 1;
89         $end_id = $start_id + $num_rpcs;
90         $now = hrtime(true);
91         $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
92         while (true) {
93             $curr_hr = hrtime(true);
94             $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9);
95             if ($curr_time > $timeout) {
96                 break;
97             }
98             // Thread variable seems to be read-only
99             $curr_id = $client_thread->num_results;
100             if ($curr_id >= $end_id) {
101                 break;
102             }
103             usleep(50000);
104         }
105
106         // Tally up results
107         $end_id = min($end_id, $client_thread->num_results);
108         // "$client_thread->results" will be in the form of
109         // [
110         //   'rpc1' => [
111         //     'hostname1', '', 'hostname2', 'hostname1', '', ...
112         //   ],
113         //   'rpc2' => [
114         //     '', 'hostname1', 'hostname2', '', 'hostname2', ...
115         //   ],
116         // ]
117         foreach ((array)$client_thread->rpc_config->rpcs_to_send as $rpc) {
118             $results = $client_thread->results[$rpc];
119             // initialize, can always start from scratch here
120             $rpcs_by_method[$rpc] = [];
121             for ($i = $start_id; $i < $end_id; $i++) {
122                 $hostname = $results[$i];
123                 if ($hostname) {
124                     // initialize in case we haven't seen this hostname
125                     // before
126                     if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) {
127                         $rpcs_by_method[$rpc][$hostname] = 0;
128                     }
129                     if (!array_key_exists($hostname, $rpcs_by_peer)) {
130                         $rpcs_by_peer[$hostname] = 0;
131                     }
132                     // increment the remote hostname distribution histogram
133                     // both by overall, and broken down per RPC
134                     $rpcs_by_method[$rpc][$hostname] += 1;
135                     $rpcs_by_peer[$hostname] += 1;
136                 } else {
137                     // $num_failures here are counted per individual RPC
138                     $num_failures += 1;
139                 }
140             }
141         }
142
143         // Convert our hashmaps above into protobuf objects
144         $response = new Grpc\Testing\LoadBalancerStatsResponse();
145         $rpcs_by_method_map = [];
146         foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) {
147             $rpcs_by_peer_proto_obj
148                 = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer();
149             $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method);
150             $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj;
151         }
152         $response->setRpcsByPeer($rpcs_by_peer);
153         $response->setRpcsByMethod($rpcs_by_method_map);
154         $response->setNumFailures($num_failures);
155         return $response;
156     }
157
158     function getClientAccumulatedStats(
159         \Grpc\Testing\LoadBalancerAccumulatedStatsRequest $request) {
160         global $client_thread;
161         $response = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse();
162         $response->setNumRpcsStartedByMethod(
163             (array)$client_thread->num_rpcs_started_by_method);
164         $response->setNumRpcsSucceededByMethod(
165             (array)$client_thread->num_rpcs_succeeded_by_method);
166         $response->setNumRpcsFailedByMethod(
167             (array)$client_thread->num_rpcs_failed_by_method);
168         $accumulated_method_stats
169             = (array)$client_thread->accumulated_method_stats;
170         $stats_per_method = [];
171         foreach ($accumulated_method_stats as $rpc_name => $stats) {
172             $methodStats
173                 = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse\MethodStats();
174             $methodStats->setRpcsStarted($stats['rpcs_started']);
175             $methodStats->setResult((array)$stats['result']);
176             $stats_per_method[$rpc_name] = $methodStats;
177         }
178         $response->setStatsPerMethod($stats_per_method);
179         return $response;
180     }
181 }
182
183 class RpcConfig extends Volatile {
184     public $server_address;
185     public $qps;
186     public $fail_on_failed_rpcs;
187     public $rpcs_to_send;
188     public $metadata_to_send;
189     public $tmp_file1;
190     public $tmp_file2;
191     public $timeout_sec;
192     public function __construct($server_address,
193                                 $qps,
194                                 $fail_on_failed_rpcs,
195                                 $rpcs_to_send,
196                                 $metadata_to_send,
197                                 $tmp_file1,
198                                 $tmp_file2) {
199         $this->server_address = $server_address;
200         $this->qps = $qps;
201         $this->fail_on_failed_rpcs = $fail_on_failed_rpcs;
202         $this->rpcs_to_send = (array)$rpcs_to_send;
203         $this->metadata_to_send = (array)$metadata_to_send;
204         $this->tmp_file1 = $tmp_file1;
205         $this->tmp_file2 = $tmp_file2;
206         $this->timeout_sec = 30;
207     }
208     public function update($rpcs_to_send, $metadata_to_send, $timeout_sec) {
209         $this->rpcs_to_send = (array)$rpcs_to_send;
210         $this->metadata_to_send = (array)$metadata_to_send;
211         $this->timeout_sec = $timeout_sec;
212     }
213 }
214
215 // This client thread blindly sends a unary RPC to the server once
216 // every 1 / qps seconds.
217 class ClientThread extends Thread {
218     private $target_seconds_between_rpcs_;
219     private $autoload_path_;
220     private $TIMEOUT_US = 30 * 1e6; // 30 seconds
221     public $rpc_config;
222     public $num_results = 0;
223     public $results;
224
225     public $RPC_MAP = [
226         'UnaryCall' => 'UNARY_CALL',
227         'EmptyCall' => 'EMPTY_CALL',
228     ];
229
230     public $num_rpcs_started_by_method = [];
231     public $num_rpcs_succeeded_by_method = [];
232     public $num_rpcs_failed_by_method = [];
233     public $accumulated_method_stats = [];
234
235     public function __construct($rpc_config,
236                                 $autoload_path) {
237         $this->rpc_config = $rpc_config;
238         $this->target_seconds_between_rpcs_ = 1.0 / $rpc_config->qps;
239         $this->autoload_path_ = $autoload_path;
240         $this->simple_request = new Grpc\Testing\SimpleRequest();
241         $this->empty_request = new Grpc\Testing\EmptyMessage();
242         $this->results = [];
243         foreach (['UnaryCall', 'EmptyCall'] as $rpc) {
244             $this->results[$rpc] = [];
245         }
246         $this->outstanding_rpcs = [];
247         foreach (['UNARY_CALL', 'EMPTY_CALL'] as $rpc_stats_key) {
248             $this->num_rpcs_started_by_method[$rpc_stats_key] = 0;
249             $this->num_rpcs_succeeded_by_method[$rpc_stats_key] = 0;
250             $this->num_rpcs_failed_by_method[$rpc_stats_key] = 0;
251             $this->accumulated_method_stats[$rpc_stats_key] = [
252                 'rpcs_started' => 0,
253                 'result' => [],
254             ];
255         }
256     }
257
258     public function sendUnaryCall($stub, $metadata) {
259         $timeout = $this->rpc_config->timeout_sec ?
260                  $this->rpc_config->timeout_sec * 1e6 :
261                  $this->TIMEOUT_US;
262         return $stub->UnaryCall($this->simple_request,
263                                 $metadata,
264                                 ['timeout' => $timeout]);
265     }
266
267     public function sendEmptyCall($stub, $metadata) {
268         $timeout = $this->rpc_config->timeout_sec ?
269                  $this->rpc_config->timeout_sec * 1e6 :
270                  $this->TIMEOUT_US;
271         return $stub->EmptyCall($this->empty_request,
272                                 $metadata,
273                                 ['timeout' => $timeout]);
274     }
275
276     public function add_rpc_result($rpc, $status_code) {
277         // $rpc here needs to be in the format of 'UnaryCall', 'EmptyCall'
278         if (!isset($this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
279                    ['result'][$status_code])) {
280             $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
281                 ['result'][$status_code] = 0;
282         }
283         $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
284             ['result'][$status_code] += 1;
285     }
286
287     public function check_child_process_result() {
288         if (sizeof($this->outstanding_rpcs) > 0 &&
289             $this->rpc_config->tmp_file2) {
290             $keys_to_delete = [];
291             // tmp_file2 contains the RPC result of each RPC we
292             // originally wrote to tmp_file1
293             $f2 = fopen($this->rpc_config->tmp_file2, 'r+');
294             flock($f2, LOCK_EX);
295             while (true) {
296                 $f2_line = fgets($f2);
297                 if (!$f2_line) {
298                     break;
299                 }
300                 // format here needs to be in sync with
301                 // src/php/bin/xds_manager.py
302                 $parts = explode(',', trim($f2_line));
303                 $key = $parts[0];
304                 $returncode = $parts[1];
305                 if (isset($this->outstanding_rpcs[$key])) {
306                     $parts2 = explode('|', $key);
307                     $result_num = $parts2[0];
308                     $rpc_name = $parts2[1];
309                     // Child processes can only communicate back the
310                     // status code for now.
311                     // Current interop test specs only call for
312                     // reporting back the status code in these scenarios.
313                     // If we ever need the hostname reported back from
314                     // child processes, we need to enhance this
315                     // communication framework through tmp files.
316                     $this->results[$rpc_name][$result_num] = "";
317                     if ($returncode) {
318                         $this->num_rpcs_failed_by_method
319                             [$this->RPC_MAP[$rpc_name]] += 1;
320                     } else {
321                         $this->num_rpcs_succeeded_by_method
322                             [$this->RPC_MAP[$rpc_name]] += 1;
323                     }
324                     $this->add_rpc_result($rpc_name, $returncode);
325                     $keys_to_delete[] = $key;
326                 }
327             }
328             foreach ($keys_to_delete as $key) {
329                 unset($this->outstanding_rpcs[$key]);
330             }
331             ftruncate($f2, 0);
332             flock($f2, LOCK_UN);
333             fclose($f2);
334         }
335     }
336
337     public function execute_rpc_in_child_process($rpc, $metadata_serialized) {
338         // tmp_file1 contains the list of RPCs (and their
339         // specs) we want executed. This will be picked up
340         // by src/php/bin/xds_manager.py
341         $f1 = fopen($this->rpc_config->tmp_file1, 'a');
342         $key = implode('|', [$this->num_results,
343                              $rpc,
344                              $metadata_serialized,
345                              $this->rpc_config->timeout_sec]);
346         flock($f1, LOCK_EX);
347         fwrite($f1, $key."\n");
348         fflush($f1);
349         flock($f1, LOCK_UN);
350         fclose($f1);
351         $this->outstanding_rpcs[$key] = 1;
352         $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
353         $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
354             ['rpcs_started'] += 1;
355     }
356
357     public function run() {
358         // Autoloaded classes do not get inherited in threads.
359         // Hence we need to do this.
360         require_once($this->autoload_path_);
361
362         $stub = new Grpc\Testing\TestServiceClient(
363             $this->rpc_config->server_address,
364             ['credentials' => Grpc\ChannelCredentials::createInsecure()
365         ]);
366         // hrtime returns nanoseconds
367         $target_next_start_us = hrtime(true) / 1000;
368         while (true) {
369             $now_us = hrtime(true) / 1000;
370             $sleep_us = $target_next_start_us - $now_us;
371             if ($sleep_us < 0) {
372                 $target_next_start_us =
373                         $now_us + ($this->target_seconds_between_rpcs_ * 1e6);
374             } else {
375                 $target_next_start_us +=
376                         ($this->target_seconds_between_rpcs_ * 1e6);
377                 usleep($sleep_us);
378             }
379             $this->check_child_process_result();
380             foreach ($this->rpc_config->rpcs_to_send as $rpc) {
381                 $metadata_to_send_arr
382                     = (array)$this->rpc_config->metadata_to_send;
383                 $metadata = array_key_exists($rpc, $metadata_to_send_arr) ?
384                           $metadata_to_send_arr[$rpc] : [];
385                 // This copy is somehow necessary because
386                 // $this->metadata_to_send[$rpc] somehow becomes a
387                 // Volatile object, instead of an associative array.
388                 $metadata_array = [];
389                 $execute_in_child_process = false;
390                 foreach ($metadata as $key => $value) {
391                     $metadata_array[$key] = [$value];
392                     if ($key == 'rpc-behavior' || $key == 'fi_testcase') {
393                         $execute_in_child_process = true;
394                     }
395                 }
396                 if ($execute_in_child_process && $this->rpc_config->tmp_file1) {
397                     // if 'rpc-behavior' is set, we need to pawn off
398                     // the execution to some other child PHP processes
399                     $this->execute_rpc_in_child_process(
400                         $rpc, serialize($metadata_array));
401                     continue;
402                 }
403                 // Execute RPC within this script
404                 $call = null;
405                 if ($rpc == 'UnaryCall') {
406                     $call = $this->sendUnaryCall($stub, $metadata_array);
407                 } else if ($rpc == 'EmptyCall') {
408                     $call = $this->sendEmptyCall($stub, $metadata_array);
409                 } else {
410                     throw new Exception("Unhandled rpc $rpc");
411                 }
412                 $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
413                 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
414                     ['rpcs_started'] += 1;
415                 // the remote peer is being returned as part of the
416                 // initial metadata, according to the test spec
417                 $initial_metadata = $call->getMetadata();
418                 list($response, $status) = $call->wait();
419                 if ($status->code == Grpc\STATUS_OK &&
420                     array_key_exists('hostname', $initial_metadata)) {
421                     $this->results[$rpc][$this->num_results]
422                         = $initial_metadata['hostname'][0];
423                     $this->num_rpcs_succeeded_by_method
424                         [$this->RPC_MAP[$rpc]] += 1;
425                     $this->add_rpc_result($rpc, 0);
426                 } else {
427                     if ($this->rpc_config->fail_on_failed_rpcs_) {
428                         throw new Exception("$rpc failed with status "
429                                             . $status->code);
430                     }
431                     $this->results[$rpc][$this->num_results] = "";
432                     $this->num_rpcs_failed_by_method
433                         [$this->RPC_MAP[$rpc]] += 1;
434                     $this->add_rpc_result($rpc, $status->code);
435                 }
436             }
437             // $num_results here is only incremented when the group of
438             // all $rpcs_to_send are done.
439             $this->num_results++;
440         }
441     }
442
443     // This is needed for loading autoload_path in the child thread
444     public function start(int $options = PTHREADS_INHERIT_ALL) {
445         return parent::start(PTHREADS_INHERIT_NONE);
446     }
447 }
448
449
450 // Note: num_channels are currently ignored for now
451 $args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
452                     'rpc:', 'metadata:', 'tmp_file1:', 'tmp_file2:',
453                     'server:', 'stats_port:', 'qps:']);
454
455 // Convert input in the form of
456 //   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
457 // into
458 //   [
459 //     'rpc1' => [
460 //       'k1' => 'v1',
461 //       'k3' => 'v3',
462 //     ],
463 //     'rpc2' => [
464 //       'k2' => 'v2'
465 //     ],
466 //   ]
467 $metadata_to_send = [];
468 if ($_all_metadata = explode(',', $args['metadata'])) {
469     foreach ($_all_metadata as $one_metadata_pair) {
470         list($rpc,
471              $metadata_key,
472              $metadata_value) = explode(':', $one_metadata_pair);
473         // initialize in case we haven't seen this rpc before
474         if (!array_key_exists($rpc, $metadata_to_send)) {
475             $metadata_to_send[$rpc] = [];
476         }
477         $metadata_to_send[$rpc][$metadata_key] = $metadata_value;
478     }
479 }
480 $rpcs_to_send = (empty($args['rpc']) ? 'UnaryCall' : $args['rpc']);
481
482 // Need to communicate the xds server name to the async runner manager
483 if ($args['tmp_file1']) {
484     $f1 = fopen($args['tmp_file1'], 'w');
485     fwrite($f1, 'server_address,'.$args['server']);
486     fclose($f1);
487 }
488
489 $rpc_config = new RpcConfig($args['server'],
490                             $args['qps'],
491                             $args['fail_on_failed_rpcs'],
492                             explode(',', $rpcs_to_send),
493                             $metadata_to_send,
494                             $args['tmp_file1'],
495                             $args['tmp_file2']);
496
497
498 $client_thread = new ClientThread($rpc_config,
499                                   $autoload_path);
500 $client_thread->start();
501
502 $server = new Grpc\RpcServer();
503 $server->addHttp2Port('0.0.0.0:'.$args['stats_port']);
504 $server->handle(new LoadBalancerStatsService());
505 $server->handle(new XdsUpdateClientConfigureService());
506 $server->run();