4 * Copyright 2020 gRPC authors.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
21 * This is the PHP xDS Interop test client. This script is meant to be run by
22 * the main xDS Interep test runner "run_xds_tests.py", not to be run
23 * by itself standalone.
25 $autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php');
26 require_once $autoload_path;
28 class XdsUpdateClientConfigureService
29 extends \Grpc\Testing\XdsUpdateClientConfigureServiceStub
32 \Grpc\Testing\ClientConfigureRequest $request,
33 \Grpc\ServerContext $context
34 ): ?\Grpc\Testing\ClientConfigureResponse {
35 $rpc_types = $request->getTypes();
36 $all_metadata = $request->getMetadata();
38 foreach ($rpc_types as $rpc_type) {
40 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
41 $rpcs_to_send[] = 'EmptyCall';
42 } else if ($rpc_type ==
43 \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
44 $rpcs_to_send[] = 'UnaryCall';
47 $metadata_to_send = [];
48 foreach ($all_metadata as $metadata) {
49 $rpc_type = $metadata->getType();
51 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
52 $rpc_type_key = 'EmptyCall';
53 } else if ($rpc_type ==
54 \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
55 $rpc_type_key = 'UnaryCall';
57 $key = $metadata->getKey();
58 $value = $metadata->getValue();
59 if (!isset($metadata_to_send[$rpc_type_key])) {
60 $metadata_to_send[$rpc_type_key] = [];
62 $metadata_to_send[$rpc_type_key][$key] = $value;
64 global $client_thread;
65 echo "PHP parent: Setting client_thread rpc_config to \n";
66 print_r($rpcs_to_send);
67 print_r($metadata_to_send);
68 echo "PHP parent: timeout_sec = ".$request->getTimeoutSec()."\n";
69 $client_thread->rpc_config->update($rpcs_to_send,
71 $request->getTimeoutSec());
72 return new Grpc\Testing\ClientConfigureResponse();
76 // The main xds interop test runner will ping this service to ask for
77 // the stats of the distribution of the backends, for the next X rpcs.
78 class LoadBalancerStatsService
79 extends \Grpc\Testing\LoadBalancerStatsServiceStub
81 function getClientStats(
82 \Grpc\Testing\LoadBalancerStatsRequest $request,
83 \Grpc\ServerContext $context
84 ): ?\Grpc\Testing\LoadBalancerStatsResponse {
85 $num_rpcs = $request->getNumRpcs();
86 $timeout_sec = $request->getTimeoutSec();
91 // Heavy limitation now: the server is blocking, until all
92 // the necessary num_rpcs are finished, or timeout is reached
93 global $client_thread;
94 $start_id = $client_thread->num_results + 1;
95 $end_id = $start_id + $num_rpcs;
97 $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
99 $curr_hr = hrtime(true);
100 $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9);
101 if ($curr_time > $timeout) {
104 // Thread variable seems to be read-only
105 $curr_id = $client_thread->num_results;
106 if ($curr_id >= $end_id) {
113 $end_id = min($end_id, $client_thread->num_results);
114 // "$client_thread->results" will be in the form of
117 // 'hostname1', '', 'hostname2', 'hostname1', '', ...
120 // '', 'hostname1', 'hostname2', '', 'hostname2', ...
123 foreach ((array)$client_thread->rpc_config->rpcs_to_send as $rpc) {
124 $results = $client_thread->results[$rpc];
125 // initialize, can always start from scratch here
126 $rpcs_by_method[$rpc] = [];
127 for ($i = $start_id; $i < $end_id; $i++) {
128 $hostname = $results[$i];
130 // initialize in case we haven't seen this hostname
132 if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) {
133 $rpcs_by_method[$rpc][$hostname] = 0;
135 if (!array_key_exists($hostname, $rpcs_by_peer)) {
136 $rpcs_by_peer[$hostname] = 0;
138 // increment the remote hostname distribution histogram
139 // both by overall, and broken down per RPC
140 $rpcs_by_method[$rpc][$hostname] += 1;
141 $rpcs_by_peer[$hostname] += 1;
143 // $num_failures here are counted per individual RPC
149 // Convert our hashmaps above into protobuf objects
150 $response = new Grpc\Testing\LoadBalancerStatsResponse();
151 $rpcs_by_method_map = [];
152 foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) {
153 $rpcs_by_peer_proto_obj
154 = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer();
155 $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method);
156 $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj;
158 $response->setRpcsByPeer($rpcs_by_peer);
159 $response->setRpcsByMethod($rpcs_by_method_map);
160 $response->setNumFailures($num_failures);
164 function GetClientAccumulatedStats(
165 \Grpc\Testing\LoadBalancerAccumulatedStatsRequest $request,
166 \Grpc\ServerContext $context
167 ): ?\Grpc\Testing\LoadBalancerAccumulatedStatsResponse {
168 global $client_thread;
169 $response = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse();
170 $response->setNumRpcsStartedByMethod(
171 (array)$client_thread->num_rpcs_started_by_method);
172 $response->setNumRpcsSucceededByMethod(
173 (array)$client_thread->num_rpcs_succeeded_by_method);
174 $response->setNumRpcsFailedByMethod(
175 (array)$client_thread->num_rpcs_failed_by_method);
176 $accumulated_method_stats
177 = (array)$client_thread->accumulated_method_stats;
178 $stats_per_method = [];
179 foreach ($accumulated_method_stats as $rpc_name => $stats) {
181 = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse\MethodStats();
182 $methodStats->setRpcsStarted($stats['rpcs_started']);
183 $methodStats->setResult((array)$stats['result']);
184 $stats_per_method[$rpc_name] = $methodStats;
186 $response->setStatsPerMethod($stats_per_method);
191 class RpcConfig extends Volatile {
192 public $server_address;
194 public $fail_on_failed_rpcs;
195 public $rpcs_to_send;
196 public $metadata_to_send;
200 public function __construct($server_address,
202 $fail_on_failed_rpcs,
207 $this->server_address = $server_address;
209 $this->fail_on_failed_rpcs = $fail_on_failed_rpcs;
210 $this->rpcs_to_send = (array)$rpcs_to_send;
211 $this->metadata_to_send = (array)$metadata_to_send;
212 $this->tmp_file1 = $tmp_file1;
213 $this->tmp_file2 = $tmp_file2;
214 $this->timeout_sec = 30;
216 public function update($rpcs_to_send, $metadata_to_send, $timeout_sec) {
217 $this->rpcs_to_send = (array)$rpcs_to_send;
218 $this->metadata_to_send = (array)$metadata_to_send;
219 $this->timeout_sec = $timeout_sec;
223 // This client thread blindly sends a unary RPC to the server once
224 // every 1 / qps seconds.
225 class ClientThread extends Thread {
226 private $target_seconds_between_rpcs_;
227 private $autoload_path_;
228 private $TIMEOUT_US = 30 * 1e6; // 30 seconds
230 public $num_results = 0;
234 'UnaryCall' => 'UNARY_CALL',
235 'EmptyCall' => 'EMPTY_CALL',
238 public $num_rpcs_started_by_method = [];
239 public $num_rpcs_succeeded_by_method = [];
240 public $num_rpcs_failed_by_method = [];
241 public $accumulated_method_stats = [];
243 public function __construct($rpc_config,
245 $this->rpc_config = $rpc_config;
246 $this->target_seconds_between_rpcs_ = 1.0 / $rpc_config->qps;
247 $this->autoload_path_ = $autoload_path;
248 $this->simple_request = new Grpc\Testing\SimpleRequest();
249 $this->empty_request = new Grpc\Testing\EmptyMessage();
251 foreach (['UnaryCall', 'EmptyCall'] as $rpc) {
252 $this->results[$rpc] = [];
254 $this->outstanding_rpcs = [];
255 foreach (['UNARY_CALL', 'EMPTY_CALL'] as $rpc_stats_key) {
256 $this->num_rpcs_started_by_method[$rpc_stats_key] = 0;
257 $this->num_rpcs_succeeded_by_method[$rpc_stats_key] = 0;
258 $this->num_rpcs_failed_by_method[$rpc_stats_key] = 0;
259 $this->accumulated_method_stats[$rpc_stats_key] = [
266 public function sendUnaryCall($stub, $metadata) {
267 $timeout = $this->rpc_config->timeout_sec ?
268 $this->rpc_config->timeout_sec * 1e6 :
270 return $stub->UnaryCall($this->simple_request,
272 ['timeout' => $timeout]);
275 public function sendEmptyCall($stub, $metadata) {
276 $timeout = $this->rpc_config->timeout_sec ?
277 $this->rpc_config->timeout_sec * 1e6 :
279 return $stub->EmptyCall($this->empty_request,
281 ['timeout' => $timeout]);
284 public function add_rpc_result($rpc, $status_code) {
285 // $rpc here needs to be in the format of 'UnaryCall', 'EmptyCall'
286 if (!isset($this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
287 ['result'][$status_code])) {
288 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
289 ['result'][$status_code] = 0;
291 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
292 ['result'][$status_code] += 1;
295 public function check_child_process_result() {
296 if (sizeof($this->outstanding_rpcs) > 0 &&
297 $this->rpc_config->tmp_file2) {
298 $keys_to_delete = [];
299 // tmp_file2 contains the RPC result of each RPC we
300 // originally wrote to tmp_file1
301 $f2 = fopen($this->rpc_config->tmp_file2, 'r+');
304 $f2_line = fgets($f2);
308 // format here needs to be in sync with
309 // src/php/bin/xds_manager.py
310 $parts = explode(',', trim($f2_line));
312 $returncode = $parts[1];
313 if (isset($this->outstanding_rpcs[$key])) {
314 $parts2 = explode('|', $key);
315 $result_num = $parts2[0];
316 $rpc_name = $parts2[1];
317 // Child processes can only communicate back the
318 // status code for now.
319 // Current interop test specs only call for
320 // reporting back the status code in these scenarios.
321 // If we ever need the hostname reported back from
322 // child processes, we need to enhance this
323 // communication framework through tmp files.
324 $this->results[$rpc_name][$result_num] = "";
326 $this->num_rpcs_failed_by_method
327 [$this->RPC_MAP[$rpc_name]] += 1;
329 $this->num_rpcs_succeeded_by_method
330 [$this->RPC_MAP[$rpc_name]] += 1;
332 $this->add_rpc_result($rpc_name, $returncode);
333 $keys_to_delete[] = $key;
336 foreach ($keys_to_delete as $key) {
337 unset($this->outstanding_rpcs[$key]);
345 public function execute_rpc_in_child_process($rpc, $metadata_serialized) {
346 // tmp_file1 contains the list of RPCs (and their
347 // specs) we want executed. This will be picked up
348 // by src/php/bin/xds_manager.py
349 $f1 = fopen($this->rpc_config->tmp_file1, 'a');
350 $key = implode('|', [$this->num_results,
352 $metadata_serialized,
353 $this->rpc_config->timeout_sec]);
355 fwrite($f1, $key."\n");
359 $this->outstanding_rpcs[$key] = 1;
360 $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
361 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
362 ['rpcs_started'] += 1;
365 public function run() {
366 // Autoloaded classes do not get inherited in threads.
367 // Hence we need to do this.
368 require_once($this->autoload_path_);
370 $stub = new Grpc\Testing\TestServiceClient(
371 $this->rpc_config->server_address,
372 ['credentials' => Grpc\ChannelCredentials::createInsecure()
374 // hrtime returns nanoseconds
375 $target_next_start_us = hrtime(true) / 1000;
377 $now_us = hrtime(true) / 1000;
378 $sleep_us = $target_next_start_us - $now_us;
380 $target_next_start_us =
381 $now_us + ($this->target_seconds_between_rpcs_ * 1e6);
383 $target_next_start_us +=
384 ($this->target_seconds_between_rpcs_ * 1e6);
387 $this->check_child_process_result();
388 foreach ($this->rpc_config->rpcs_to_send as $rpc) {
389 $metadata_to_send_arr
390 = (array)$this->rpc_config->metadata_to_send;
391 $metadata = array_key_exists($rpc, $metadata_to_send_arr) ?
392 $metadata_to_send_arr[$rpc] : [];
393 // This copy is somehow necessary because
394 // $this->metadata_to_send[$rpc] somehow becomes a
395 // Volatile object, instead of an associative array.
396 $metadata_array = [];
397 $execute_in_child_process = false;
398 foreach ($metadata as $key => $value) {
399 $metadata_array[$key] = [$value];
400 if ($key == 'rpc-behavior' || $key == 'fi_testcase') {
401 $execute_in_child_process = true;
404 if ($execute_in_child_process && $this->rpc_config->tmp_file1) {
405 // if 'rpc-behavior' is set, we need to pawn off
406 // the execution to some other child PHP processes
407 $this->execute_rpc_in_child_process(
408 $rpc, serialize($metadata_array));
411 // Execute RPC within this script
413 if ($rpc == 'UnaryCall') {
414 $call = $this->sendUnaryCall($stub, $metadata_array);
415 } else if ($rpc == 'EmptyCall') {
416 $call = $this->sendEmptyCall($stub, $metadata_array);
418 throw new Exception("Unhandled rpc $rpc");
420 $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
421 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
422 ['rpcs_started'] += 1;
423 // the remote peer is being returned as part of the
424 // initial metadata, according to the test spec
425 $initial_metadata = $call->getMetadata();
426 list($response, $status) = $call->wait();
427 if ($status->code == Grpc\STATUS_OK &&
428 array_key_exists('hostname', $initial_metadata)) {
429 $this->results[$rpc][$this->num_results]
430 = $initial_metadata['hostname'][0];
431 $this->num_rpcs_succeeded_by_method
432 [$this->RPC_MAP[$rpc]] += 1;
433 $this->add_rpc_result($rpc, 0);
435 if ($this->rpc_config->fail_on_failed_rpcs_) {
436 throw new Exception("$rpc failed with status "
439 $this->results[$rpc][$this->num_results] = "";
440 $this->num_rpcs_failed_by_method
441 [$this->RPC_MAP[$rpc]] += 1;
442 $this->add_rpc_result($rpc, $status->code);
445 // $num_results here is only incremented when the group of
446 // all $rpcs_to_send are done.
447 $this->num_results++;
451 // This is needed for loading autoload_path in the child thread
452 public function start(int $options = PTHREADS_INHERIT_ALL) {
453 return parent::start(PTHREADS_INHERIT_NONE);
458 // Note: num_channels are currently ignored for now
459 $args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
460 'rpc:', 'metadata:', 'tmp_file1:', 'tmp_file2:',
461 'server:', 'stats_port:', 'qps:']);
463 // Convert input in the form of
464 // rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
475 $metadata_to_send = [];
476 if ($_all_metadata = explode(',', $args['metadata'])) {
477 foreach ($_all_metadata as $one_metadata_pair) {
480 $metadata_value) = explode(':', $one_metadata_pair);
481 // initialize in case we haven't seen this rpc before
482 if (!array_key_exists($rpc, $metadata_to_send)) {
483 $metadata_to_send[$rpc] = [];
485 $metadata_to_send[$rpc][$metadata_key] = $metadata_value;
488 $rpcs_to_send = (empty($args['rpc']) ? 'UnaryCall' : $args['rpc']);
490 // Need to communicate the xds server name to the async runner manager
491 if ($args['tmp_file1']) {
492 $f1 = fopen($args['tmp_file1'], 'w');
493 fwrite($f1, 'server_address,'.$args['server']);
497 $rpc_config = new RpcConfig($args['server'],
499 $args['fail_on_failed_rpcs'],
500 explode(',', $rpcs_to_send),
506 $client_thread = new ClientThread($rpc_config,
508 $client_thread->start();
510 $server = new Grpc\RpcServer();
511 $server->addHttp2Port('0.0.0.0:'.$args['stats_port']);
512 $server->handle(new LoadBalancerStatsService());
513 $server->handle(new XdsUpdateClientConfigureService());