46af805f3fa9db31357104f33c3ed8b635090fee
[platform/upstream/grpc.git] / src / core / ext / filters / load_reporting / server_load_reporting_filter.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <string>
24
25 #include "absl/strings/str_format.h"
26
27 #include <grpc/grpc_security.h>
28 #include <grpc/slice.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31
32 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
33 #include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
34 #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
35 #include "src/core/lib/address_utils/parse_address.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/channel/context.h"
38 #include "src/core/lib/iomgr/resolve_address.h"
39 #include "src/core/lib/iomgr/sockaddr.h"
40 #include "src/core/lib/iomgr/socket_utils.h"
41 #include "src/core/lib/security/context/security_context.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "src/core/lib/surface/call.h"
44 #include "src/core/lib/uri/uri_parser.h"
45
46 namespace grpc {
47
48 constexpr char kEncodedIpv4AddressLengthString[] = "08";
49 constexpr char kEncodedIpv6AddressLengthString[] = "32";
50 constexpr char kEmptyAddressLengthString[] = "00";
51 constexpr size_t kLengthPrefixSize = 2;
52
53 grpc_error_handle ServerLoadReportingChannelData::Init(
54     grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
55   GPR_ASSERT(!args->is_last);
56   // Find and record the peer_identity.
57   const grpc_auth_context* auth_context =
58       grpc_find_auth_context_in_args(args->channel_args);
59   if (auth_context != nullptr &&
60       grpc_auth_context_peer_is_authenticated(auth_context)) {
61     grpc_auth_property_iterator auth_it =
62         grpc_auth_context_peer_identity(auth_context);
63     const grpc_auth_property* auth_property =
64         grpc_auth_property_iterator_next(&auth_it);
65     if (auth_property != nullptr) {
66       peer_identity_ = auth_property->value;
67       peer_identity_len_ = auth_property->value_length;
68     }
69   }
70   return GRPC_ERROR_NONE;
71 }
72
73 void ServerLoadReportingCallData::Destroy(
74     grpc_call_element* elem, const grpc_call_final_info* final_info,
75     grpc_closure* /*then_call_closure*/) {
76   ServerLoadReportingChannelData* chand =
77       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
78   // Only record an end if we've recorded its corresponding start, which is
79   // indicated by a non-null client_ip_and_lr_token_. Note that it's possible
80   // that we attempt to record the call end before we have recorded the call
81   // start, because the data needed for recording the start comes from the
82   // initial metadata, which may not be ready before the call finishes.
83   if (client_ip_and_lr_token_ != nullptr) {
84     opencensus::stats::Record(
85         {{::grpc::load_reporter::MeasureEndCount(), 1},
86          {::grpc::load_reporter::MeasureEndBytesSent(),
87           final_info->stats.transport_stream_stats.outgoing.data_bytes},
88          {::grpc::load_reporter::MeasureEndBytesReceived(),
89           final_info->stats.transport_stream_stats.incoming.data_bytes},
90          {::grpc::load_reporter::MeasureEndLatencyMs(),
91           gpr_time_to_millis(final_info->stats.latency)}},
92         {{::grpc::load_reporter::TagKeyToken(),
93           {client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
94          {::grpc::load_reporter::TagKeyHost(),
95           {target_host_, target_host_len_}},
96          {::grpc::load_reporter::TagKeyUserId(),
97           {chand->peer_identity(), chand->peer_identity_len()}},
98          {::grpc::load_reporter::TagKeyStatus(),
99           GetStatusTagForStatus(final_info->final_status)}});
100     gpr_free(client_ip_and_lr_token_);
101   }
102   gpr_free(target_host_);
103   grpc_slice_unref_internal(service_method_);
104 }
105
106 void ServerLoadReportingCallData::StartTransportStreamOpBatch(
107     grpc_call_element* elem, TransportStreamOpBatch* op) {
108   GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0);
109   if (op->recv_initial_metadata() != nullptr) {
110     // Save some fields to use when initial metadata is ready.
111     peer_string_ = op->get_peer_string();
112     recv_initial_metadata_ =
113         op->op()->payload->recv_initial_metadata.recv_initial_metadata;
114     original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
115     // Substitute the original closure for the wrapper closure.
116     op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
117   } else if (op->send_trailing_metadata() != nullptr) {
118     GRPC_LOG_IF_ERROR(
119         "server_load_reporting_filter",
120         grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(),
121                                    SendTrailingMetadataFilter, elem,
122                                    "send_trailing_metadata filtering error"));
123   }
124   grpc_call_next_op(elem, op->op());
125 }
126
127 std::string ServerLoadReportingCallData::GetCensusSafeClientIpString() {
128   // Find the client URI string.
129   const char* client_uri_str =
130       reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_));
131   if (client_uri_str == nullptr) {
132     gpr_log(GPR_ERROR,
133             "Unable to extract client URI string (peer string) from gRPC "
134             "metadata.");
135     return "";
136   }
137   absl::StatusOr<grpc_core::URI> client_uri =
138       grpc_core::URI::Parse(client_uri_str);
139   if (!client_uri.ok()) {
140     gpr_log(GPR_ERROR,
141             "Unable to parse the client URI string (peer string) to a client "
142             "URI. Error: %s",
143             client_uri.status().ToString().c_str());
144     return "";
145   }
146   // Parse the client URI into grpc_resolved_address.
147   grpc_resolved_address resolved_address;
148   bool success = grpc_parse_uri(*client_uri, &resolved_address);
149   if (!success) {
150     gpr_log(GPR_ERROR,
151             "Unable to parse client URI into a grpc_resolved_address.");
152     return "";
153   }
154   // Convert the socket address in the grpc_resolved_address into a hex string
155   // according to the address family.
156   grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr);
157   if (addr->sa_family == GRPC_AF_INET) {
158     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr);
159     return absl::StrFormat("%08x", grpc_ntohl(addr4->sin_addr.s_addr));
160   } else if (addr->sa_family == GRPC_AF_INET6) {
161     grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
162     std::string client_ip;
163     client_ip.reserve(32);
164     uint32_t* addr6_next_long = reinterpret_cast<uint32_t*>(&addr6->sin6_addr);
165     for (size_t i = 0; i < 4; ++i) {
166       absl::StrAppendFormat(&client_ip, "%08x", grpc_ntohl(*addr6_next_long++));
167     }
168     return client_ip;
169   } else {
170     GPR_UNREACHABLE_CODE();
171   }
172 }
173
174 void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token,
175                                                           size_t lr_token_len) {
176   std::string client_ip = GetCensusSafeClientIpString();
177   client_ip_and_lr_token_len_ =
178       kLengthPrefixSize + client_ip.size() + lr_token_len;
179   client_ip_and_lr_token_ = static_cast<char*>(
180       gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char)));
181   char* cur_pos = client_ip_and_lr_token_;
182   // Store the IP length prefix.
183   if (client_ip.empty()) {
184     strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize);
185   } else if (client_ip.size() == 8) {
186     strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize);
187   } else if (client_ip.size() == 32) {
188     strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize);
189   } else {
190     GPR_UNREACHABLE_CODE();
191   }
192   cur_pos += kLengthPrefixSize;
193   // Store the IP.
194   if (!client_ip.empty()) {
195     strncpy(cur_pos, client_ip.c_str(), client_ip.size());
196   }
197   cur_pos += client_ip.size();
198   // Store the LR token.
199   if (lr_token_len != 0) {
200     strncpy(cur_pos, lr_token, lr_token_len);
201   }
202   GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ ==
203              long(client_ip_and_lr_token_len_));
204 }
205
206 grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter(
207     void* user_data, grpc_mdelem md) {
208   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
209   ServerLoadReportingCallData* calld =
210       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
211   if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
212     calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md));
213   } else if (calld->target_host_ == nullptr &&
214              grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
215     grpc_slice target_host_slice = GRPC_MDVALUE(md);
216     calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice);
217     calld->target_host_ =
218         reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_));
219     for (size_t i = 0; i < calld->target_host_len_; ++i) {
220       calld->target_host_[i] = static_cast<char>(
221           tolower(GRPC_SLICE_START_PTR(target_host_slice)[i]));
222     }
223   } else if (grpc_slice_str_cmp(GRPC_MDKEY(md),
224                                 grpc_core::kGrpcLbLbTokenMetadataKey) == 0) {
225     if (calld->client_ip_and_lr_token_ == nullptr) {
226       calld->StoreClientIpAndLrToken(
227           reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)),
228           GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)));
229     }
230     return GRPC_FILTERED_REMOVE();
231   }
232   return GRPC_FILTERED_MDELEM(md);
233 }
234
235 void ServerLoadReportingCallData::RecvInitialMetadataReady(
236     void* arg, grpc_error_handle err) {
237   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
238   ServerLoadReportingCallData* calld =
239       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
240   ServerLoadReportingChannelData* chand =
241       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
242   if (err == GRPC_ERROR_NONE) {
243     GRPC_LOG_IF_ERROR(
244         "server_load_reporting_filter",
245         grpc_metadata_batch_filter(calld->recv_initial_metadata_,
246                                    RecvInitialMetadataFilter, elem,
247                                    "recv_initial_metadata filtering error"));
248     // If the LB token was not found in the recv_initial_metadata, only the
249     // client IP part will be recorded (with an empty LB token).
250     if (calld->client_ip_and_lr_token_ == nullptr) {
251       calld->StoreClientIpAndLrToken(nullptr, 0);
252     }
253     opencensus::stats::Record(
254         {{::grpc::load_reporter::MeasureStartCount(), 1}},
255         {{::grpc::load_reporter::TagKeyToken(),
256           {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
257          {::grpc::load_reporter::TagKeyHost(),
258           {calld->target_host_, calld->target_host_len_}},
259          {::grpc::load_reporter::TagKeyUserId(),
260           {chand->peer_identity(), chand->peer_identity_len()}}});
261   }
262   grpc_core::Closure::Run(DEBUG_LOCATION,
263                           calld->original_recv_initial_metadata_ready_,
264                           GRPC_ERROR_REF(err));
265 }
266
267 grpc_error_handle ServerLoadReportingCallData::Init(
268     grpc_call_element* elem, const grpc_call_element_args* /*args*/) {
269   service_method_ = grpc_empty_slice();
270   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
271                     elem, grpc_schedule_on_exec_ctx);
272   return GRPC_ERROR_NONE;
273 }
274
275 grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
276     void* user_data, grpc_mdelem md) {
277   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
278   ServerLoadReportingCallData* calld =
279       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
280   ServerLoadReportingChannelData* chand =
281       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
282   if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
283     const grpc_slice value = GRPC_MDVALUE(md);
284     const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
285     if (cost_entry_size < sizeof(double)) {
286       gpr_log(GPR_ERROR,
287               "Cost metadata value too small (%zu bytes) to hold valid data. "
288               "Ignoring.",
289               cost_entry_size);
290       return GRPC_FILTERED_REMOVE();
291     }
292     const double* cost_entry_ptr =
293         reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value));
294     double cost_value = *cost_entry_ptr++;
295     const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr);
296     const size_t cost_name_len = cost_entry_size - sizeof(double);
297     opencensus::stats::Record(
298         {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}},
299         {{::grpc::load_reporter::TagKeyToken(),
300           {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
301          {::grpc::load_reporter::TagKeyHost(),
302           {calld->target_host_, calld->target_host_len_}},
303          {::grpc::load_reporter::TagKeyUserId(),
304           {chand->peer_identity(), chand->peer_identity_len()}},
305          {::grpc::load_reporter::TagKeyMetricName(),
306           {cost_name, cost_name_len}}});
307     return GRPC_FILTERED_REMOVE();
308   }
309   return GRPC_FILTERED_MDELEM(md);
310 }
311
312 const char* ServerLoadReportingCallData::GetStatusTagForStatus(
313     grpc_status_code status) {
314   switch (status) {
315     case GRPC_STATUS_OK:
316       return ::grpc::load_reporter::kCallStatusOk;
317     case GRPC_STATUS_UNKNOWN:
318     case GRPC_STATUS_DEADLINE_EXCEEDED:
319     case GRPC_STATUS_UNIMPLEMENTED:
320     case GRPC_STATUS_INTERNAL:
321     case GRPC_STATUS_UNAVAILABLE:
322     case GRPC_STATUS_DATA_LOSS:
323       return ::grpc::load_reporter::kCallStatusServerError;
324     default:
325       return ::grpc::load_reporter::kCallStatusClientError;
326   }
327 }
328
329 namespace {
330 bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
331   return grpc_channel_arg_get_bool(
332       grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
333 }
334 }  // namespace
335
336 // TODO(juanlishen): We should register the filter during grpc initialization
337 // time once OpenCensus is compatible with our build system. For now, we force
338 // registration of the server load reporting filter at static initialization
339 // time if we build with the filter target.
340 struct ServerLoadReportingFilterStaticRegistrar {
341   ServerLoadReportingFilterStaticRegistrar() {
342     static grpc_core::Atomic<bool> registered{false};
343     if (registered.Load(grpc_core::MemoryOrder::ACQUIRE)) return;
344     RegisterChannelFilter<ServerLoadReportingChannelData,
345                           ServerLoadReportingCallData>(
346         "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
347         MaybeAddServerLoadReportingFilter);
348     // Access measures to ensure they are initialized. Otherwise, we can't
349     // create any valid view before the first RPC.
350     ::grpc::load_reporter::MeasureStartCount();
351     ::grpc::load_reporter::MeasureEndCount();
352     ::grpc::load_reporter::MeasureEndBytesSent();
353     ::grpc::load_reporter::MeasureEndBytesReceived();
354     ::grpc::load_reporter::MeasureEndLatencyMs();
355     ::grpc::load_reporter::MeasureOtherCallMetric();
356     registered.Store(true, grpc_core::MemoryOrder::RELEASE);
357   }
358 } server_load_reporting_filter_static_registrar;
359
360 }  // namespace grpc