4324e9dfc1724c9bd14a859c687e350043df38dd
[platform/upstream/grpc.git] / test / core / end2end / tests / load_reporting_hook.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <string.h>
20
21 #include <grpc/byte_buffer.h>
22 #include <grpc/load_reporting.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/string_util.h>
26 #include <grpc/support/time.h>
27
28 #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
29 #include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/transport/static_metadata.h"
32
33 #include "test/core/end2end/cq_verifier.h"
34 #include "test/core/end2end/end2end_tests.h"
35
36 enum { TIMEOUT = 200000 };
37
38 static void* tag(intptr_t t) { return (void*)t; }
39
40 typedef struct {
41   gpr_mu mu;
42   intptr_t channel_id;
43   intptr_t call_id;
44
45   char* initial_md_str;
46   char* trailing_md_str;
47   char* method_name;
48
49   uint64_t incoming_bytes;
50   uint64_t outgoing_bytes;
51
52   grpc_status_code call_final_status;
53
54   bool fully_processed;
55 } load_reporting_data;
56
57 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
58                                             const char* test_name,
59                                             grpc_channel_args* client_args,
60                                             grpc_channel_args* server_args) {
61   grpc_end2end_test_fixture f;
62   gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
63
64   f = config.create_fixture(client_args, server_args);
65   config.init_server(&f, server_args);
66   config.init_client(&f, client_args);
67
68   return f;
69 }
70
71 static gpr_timespec n_seconds_from_now(int n) {
72   return grpc_timeout_seconds_to_deadline(n);
73 }
74
75 static gpr_timespec five_seconds_from_now(void) {
76   return n_seconds_from_now(5);
77 }
78
79 static void drain_cq(grpc_completion_queue* cq) {
80   grpc_event ev;
81   do {
82     ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
83   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
84 }
85
86 static void shutdown_server(grpc_end2end_test_fixture* f) {
87   if (!f->server) return;
88   grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
89   GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
90                                          grpc_timeout_seconds_to_deadline(5),
91                                          nullptr)
92                  .type == GRPC_OP_COMPLETE);
93   grpc_server_destroy(f->server);
94   f->server = nullptr;
95 }
96
97 static void shutdown_client(grpc_end2end_test_fixture* f) {
98   if (!f->client) return;
99   grpc_channel_destroy(f->client);
100   f->client = nullptr;
101 }
102
103 static void end_test(grpc_end2end_test_fixture* f) {
104   shutdown_server(f);
105   shutdown_client(f);
106
107   grpc_completion_queue_shutdown(f->cq);
108   drain_cq(f->cq);
109   grpc_completion_queue_destroy(f->cq);
110   grpc_completion_queue_destroy(f->shutdown_cq);
111 }
112
113 static void request_response_with_payload(
114     grpc_end2end_test_config config, grpc_end2end_test_fixture f,
115     const char* method_name, const char* request_msg, const char* response_msg,
116     grpc_metadata* initial_lr_metadata, grpc_metadata* trailing_lr_metadata) {
117   grpc_slice request_payload_slice = grpc_slice_from_static_string(request_msg);
118   grpc_slice response_payload_slice =
119       grpc_slice_from_static_string(response_msg);
120   grpc_call* c;
121   grpc_call* s;
122   grpc_byte_buffer* request_payload =
123       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
124   grpc_byte_buffer* response_payload =
125       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
126   cq_verifier* cqv = cq_verifier_create(f.cq);
127   grpc_op ops[6];
128   grpc_op* op;
129   grpc_metadata_array initial_metadata_recv;
130   grpc_metadata_array trailing_metadata_recv;
131   grpc_metadata_array request_metadata_recv;
132   grpc_byte_buffer* request_payload_recv = nullptr;
133   grpc_byte_buffer* response_payload_recv = nullptr;
134   grpc_call_details call_details;
135   grpc_status_code status;
136   grpc_call_error error;
137   grpc_slice details;
138   int was_cancelled = 2;
139
140   gpr_timespec deadline = five_seconds_from_now();
141   c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
142                                grpc_slice_from_static_string(method_name),
143                                nullptr, deadline, nullptr);
144   GPR_ASSERT(c);
145
146   grpc_metadata_array_init(&initial_metadata_recv);
147   grpc_metadata_array_init(&trailing_metadata_recv);
148   grpc_metadata_array_init(&request_metadata_recv);
149   grpc_call_details_init(&call_details);
150
151   memset(ops, 0, sizeof(ops));
152   op = ops;
153   op->op = GRPC_OP_SEND_INITIAL_METADATA;
154   GPR_ASSERT(initial_lr_metadata != nullptr);
155   op->data.send_initial_metadata.count = 1;
156   op->data.send_initial_metadata.metadata = initial_lr_metadata;
157   op->flags = 0;
158   op->reserved = nullptr;
159   op++;
160   op->op = GRPC_OP_SEND_MESSAGE;
161   op->data.send_message.send_message = request_payload;
162   op->flags = 0;
163   op->reserved = nullptr;
164   op++;
165   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
166   op->flags = 0;
167   op->reserved = nullptr;
168   op++;
169   op->op = GRPC_OP_RECV_INITIAL_METADATA;
170   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
171   op->flags = 0;
172   op->reserved = nullptr;
173   op++;
174   op->op = GRPC_OP_RECV_MESSAGE;
175   op->data.recv_message.recv_message = &response_payload_recv;
176   op->flags = 0;
177   op->reserved = nullptr;
178   op++;
179   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
180   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
181   op->data.recv_status_on_client.status = &status;
182   op->data.recv_status_on_client.status_details = &details;
183   op->flags = 0;
184   op->reserved = nullptr;
185   op++;
186   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
187                                 nullptr);
188   GPR_ASSERT(GRPC_CALL_OK == error);
189
190   error =
191       grpc_server_request_call(f.server, &s, &call_details,
192                                &request_metadata_recv, f.cq, f.cq, tag(101));
193   GPR_ASSERT(GRPC_CALL_OK == error);
194   CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
195   cq_verify(cqv);
196
197   memset(ops, 0, sizeof(ops));
198   op = ops;
199   op->op = GRPC_OP_SEND_INITIAL_METADATA;
200   op->data.send_initial_metadata.count = 0;
201   op->flags = 0;
202   op->reserved = nullptr;
203   op++;
204   op->op = GRPC_OP_RECV_MESSAGE;
205   op->data.recv_message.recv_message = &request_payload_recv;
206   op->flags = 0;
207   op->reserved = nullptr;
208   op++;
209   error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
210                                 nullptr);
211   GPR_ASSERT(GRPC_CALL_OK == error);
212
213   CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
214   cq_verify(cqv);
215
216   memset(ops, 0, sizeof(ops));
217   op = ops;
218   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
219   op->data.recv_close_on_server.cancelled = &was_cancelled;
220   op->flags = 0;
221   op->reserved = nullptr;
222   op++;
223   op->op = GRPC_OP_SEND_MESSAGE;
224   op->data.send_message.send_message = response_payload;
225   op->flags = 0;
226   op->reserved = nullptr;
227   op++;
228   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
229   GPR_ASSERT(trailing_lr_metadata != nullptr);
230   op->data.send_status_from_server.trailing_metadata_count = 1;
231   op->data.send_status_from_server.trailing_metadata = trailing_lr_metadata;
232   op->data.send_status_from_server.status = GRPC_STATUS_OK;
233   grpc_slice status_details = grpc_slice_from_static_string("xyz");
234   op->data.send_status_from_server.status_details = &status_details;
235   op->flags = 0;
236   op->reserved = nullptr;
237   op++;
238   error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
239                                 nullptr);
240   GPR_ASSERT(GRPC_CALL_OK == error);
241
242   CQ_EXPECT_COMPLETION(cqv, tag(103), 1);
243   CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
244   cq_verify(cqv);
245
246   GPR_ASSERT(status == GRPC_STATUS_OK);
247
248   grpc_slice_unref(details);
249   grpc_metadata_array_destroy(&initial_metadata_recv);
250   grpc_metadata_array_destroy(&trailing_metadata_recv);
251   grpc_metadata_array_destroy(&request_metadata_recv);
252   grpc_call_details_destroy(&call_details);
253
254   grpc_call_unref(c);
255   grpc_call_unref(s);
256
257   cq_verifier_destroy(cqv);
258
259   grpc_byte_buffer_destroy(request_payload);
260   grpc_byte_buffer_destroy(response_payload);
261   grpc_byte_buffer_destroy(request_payload_recv);
262   grpc_byte_buffer_destroy(response_payload_recv);
263 }
264
265 /* override the default for testing purposes */
266 extern void (*g_load_reporting_fn)(
267     const grpc_load_reporting_call_data* call_data);
268
269 static void test_load_reporting_hook(grpc_end2end_test_config config) {
270   /* TODO(dgq): this test is currently a noop until LR is fully defined.
271    * Leaving the rest here, as it'll likely be reusable. */
272
273   /* Introduce load reporting for the server through its arguments */
274   grpc_arg arg = grpc_load_reporting_enable_arg();
275   grpc_channel_args* lr_server_args =
276       grpc_channel_args_copy_and_add(nullptr, &arg, 1);
277
278   grpc_end2end_test_fixture f =
279       begin_test(config, "test_load_reporting_hook", nullptr, lr_server_args);
280
281   const char* method_name = "/gRPCFTW";
282   const char* request_msg = "the msg from the client";
283   const char* response_msg = "... and the response from the server";
284
285   grpc_metadata initial_lr_metadata;
286   grpc_metadata trailing_lr_metadata;
287
288   initial_lr_metadata.key = GRPC_MDSTR_LB_TOKEN;
289   initial_lr_metadata.value = grpc_slice_from_static_string("client-token");
290   memset(&initial_lr_metadata.internal_data, 0,
291          sizeof(initial_lr_metadata.internal_data));
292
293   trailing_lr_metadata.key = GRPC_MDSTR_LB_COST_BIN;
294   trailing_lr_metadata.value = grpc_slice_from_static_string("server-token");
295   memset(&trailing_lr_metadata.internal_data, 0,
296          sizeof(trailing_lr_metadata.internal_data));
297
298   request_response_with_payload(config, f, method_name, request_msg,
299                                 response_msg, &initial_lr_metadata,
300                                 &trailing_lr_metadata);
301   end_test(&f);
302   {
303     grpc_core::ExecCtx exec_ctx;
304     grpc_channel_args_destroy(lr_server_args);
305   }
306   config.tear_down_data(&f);
307 }
308
309 void load_reporting_hook(grpc_end2end_test_config config) {
310   test_load_reporting_hook(config);
311 }
312
313 void load_reporting_hook_pre_init(void) {}