Imported Upstream version 1.37.0
[platform/upstream/grpc.git] / src / core / lib / channel / channelz.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/impl/codegen/port_platform.h>
20
21 #include "src/core/lib/channel/channelz.h"
22 #include "src/core/lib/iomgr/resolve_address.h"
23 #include "src/core/lib/iomgr/sockaddr_utils.h"
24
25 #include "absl/strings/escaping.h"
26 #include "absl/strings/strip.h"
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35
36 #include "src/core/lib/channel/channelz_registry.h"
37 #include "src/core/lib/channel/status_util.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gpr/useful.h"
40 #include "src/core/lib/gprpp/atomic.h"
41 #include "src/core/lib/gprpp/host_port.h"
42 #include "src/core/lib/gprpp/memory.h"
43 #include "src/core/lib/iomgr/error.h"
44 #include "src/core/lib/iomgr/exec_ctx.h"
45 #include "src/core/lib/slice/b64.h"
46 #include "src/core/lib/slice/slice_internal.h"
47 #include "src/core/lib/surface/channel.h"
48 #include "src/core/lib/surface/server.h"
49 #include "src/core/lib/transport/connectivity_state.h"
50 #include "src/core/lib/transport/error_utils.h"
51 #include "src/core/lib/uri/uri_parser.h"
52
53 namespace grpc_core {
54 namespace channelz {
55
56 //
57 // BaseNode
58 //
59
60 BaseNode::BaseNode(EntityType type, std::string name)
61     : type_(type), uuid_(-1), name_(std::move(name)) {
62   // The registry will set uuid_ under its lock.
63   ChannelzRegistry::Register(this);
64 }
65
66 BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); }
67
68 std::string BaseNode::RenderJsonString() {
69   Json json = RenderJson();
70   return json.Dump();
71 }
72
73 //
74 // CallCountingHelper
75 //
76
77 CallCountingHelper::CallCountingHelper() {
78   num_cores_ = GPR_MAX(1, gpr_cpu_num_cores());
79   per_cpu_counter_data_storage_.reserve(num_cores_);
80   for (size_t i = 0; i < num_cores_; ++i) {
81     per_cpu_counter_data_storage_.emplace_back();
82   }
83 }
84
85 void CallCountingHelper::RecordCallStarted() {
86   AtomicCounterData& data =
87       per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()];
88   data.calls_started.FetchAdd(1, MemoryOrder::RELAXED);
89   data.last_call_started_cycle.Store(gpr_get_cycle_counter(),
90                                      MemoryOrder::RELAXED);
91 }
92
93 void CallCountingHelper::RecordCallFailed() {
94   per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
95       .calls_failed.FetchAdd(1, MemoryOrder::RELAXED);
96 }
97
98 void CallCountingHelper::RecordCallSucceeded() {
99   per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
100       .calls_succeeded.FetchAdd(1, MemoryOrder::RELAXED);
101 }
102
103 void CallCountingHelper::CollectData(CounterData* out) {
104   for (size_t core = 0; core < num_cores_; ++core) {
105     AtomicCounterData& data = per_cpu_counter_data_storage_[core];
106
107     out->calls_started += data.calls_started.Load(MemoryOrder::RELAXED);
108     out->calls_succeeded +=
109         per_cpu_counter_data_storage_[core].calls_succeeded.Load(
110             MemoryOrder::RELAXED);
111     out->calls_failed += per_cpu_counter_data_storage_[core].calls_failed.Load(
112         MemoryOrder::RELAXED);
113     const gpr_cycle_counter last_call =
114         per_cpu_counter_data_storage_[core].last_call_started_cycle.Load(
115             MemoryOrder::RELAXED);
116     if (last_call > out->last_call_started_cycle) {
117       out->last_call_started_cycle = last_call;
118     }
119   }
120 }
121
122 void CallCountingHelper::PopulateCallCounts(Json::Object* json) {
123   CounterData data;
124   CollectData(&data);
125   if (data.calls_started != 0) {
126     (*json)["callsStarted"] = std::to_string(data.calls_started);
127     gpr_timespec ts = gpr_convert_clock_type(
128         gpr_cycle_counter_to_time(data.last_call_started_cycle),
129         GPR_CLOCK_REALTIME);
130     (*json)["lastCallStartedTimestamp"] = gpr_format_timespec(ts);
131   }
132   if (data.calls_succeeded != 0) {
133     (*json)["callsSucceeded"] = std::to_string(data.calls_succeeded);
134   }
135   if (data.calls_failed) {
136     (*json)["callsFailed"] = std::to_string(data.calls_failed);
137   }
138 }
139
140 //
141 // ChannelNode
142 //
143
144 ChannelNode::ChannelNode(std::string target, size_t channel_tracer_max_nodes,
145                          bool is_internal_channel)
146     : BaseNode(is_internal_channel ? EntityType::kInternalChannel
147                                    : EntityType::kTopLevelChannel,
148                target),
149       target_(std::move(target)),
150       trace_(channel_tracer_max_nodes) {}
151
152 const char* ChannelNode::GetChannelConnectivityStateChangeString(
153     grpc_connectivity_state state) {
154   switch (state) {
155     case GRPC_CHANNEL_IDLE:
156       return "Channel state change to IDLE";
157     case GRPC_CHANNEL_CONNECTING:
158       return "Channel state change to CONNECTING";
159     case GRPC_CHANNEL_READY:
160       return "Channel state change to READY";
161     case GRPC_CHANNEL_TRANSIENT_FAILURE:
162       return "Channel state change to TRANSIENT_FAILURE";
163     case GRPC_CHANNEL_SHUTDOWN:
164       return "Channel state change to SHUTDOWN";
165   }
166   GPR_UNREACHABLE_CODE(return "UNKNOWN");
167 }
168
169 Json ChannelNode::RenderJson() {
170   Json::Object data = {
171       {"target", target_},
172   };
173   // Connectivity state.
174   // If low-order bit is on, then the field is set.
175   int state_field = connectivity_state_.Load(MemoryOrder::RELAXED);
176   if ((state_field & 1) != 0) {
177     grpc_connectivity_state state =
178         static_cast<grpc_connectivity_state>(state_field >> 1);
179     data["state"] = Json::Object{
180         {"state", ConnectivityStateName(state)},
181     };
182   }
183   // Fill in the channel trace if applicable.
184   Json trace_json = trace_.RenderJson();
185   if (trace_json.type() != Json::Type::JSON_NULL) {
186     data["trace"] = std::move(trace_json);
187   }
188   // Ask CallCountingHelper to populate call count data.
189   call_counter_.PopulateCallCounts(&data);
190   // Construct outer object.
191   Json::Object json = {
192       {"ref",
193        Json::Object{
194            {"channelId", std::to_string(uuid())},
195        }},
196       {"data", std::move(data)},
197   };
198   // Template method. Child classes may override this to add their specific
199   // functionality.
200   PopulateChildRefs(&json);
201   return json;
202 }
203
204 void ChannelNode::PopulateChildRefs(Json::Object* json) {
205   MutexLock lock(&child_mu_);
206   if (!child_subchannels_.empty()) {
207     Json::Array array;
208     for (intptr_t subchannel_uuid : child_subchannels_) {
209       array.emplace_back(Json::Object{
210           {"subchannelId", std::to_string(subchannel_uuid)},
211       });
212     }
213     (*json)["subchannelRef"] = std::move(array);
214   }
215   if (!child_channels_.empty()) {
216     Json::Array array;
217     for (intptr_t channel_uuid : child_channels_) {
218       array.emplace_back(Json::Object{
219           {"channelId", std::to_string(channel_uuid)},
220       });
221     }
222     (*json)["channelRef"] = std::move(array);
223   }
224 }
225
226 void ChannelNode::SetConnectivityState(grpc_connectivity_state state) {
227   // Store with low-order bit set to indicate that the field is set.
228   int state_field = (state << 1) + 1;
229   connectivity_state_.Store(state_field, MemoryOrder::RELAXED);
230 }
231
232 void ChannelNode::AddChildChannel(intptr_t child_uuid) {
233   MutexLock lock(&child_mu_);
234   child_channels_.insert(child_uuid);
235 }
236
237 void ChannelNode::RemoveChildChannel(intptr_t child_uuid) {
238   MutexLock lock(&child_mu_);
239   child_channels_.erase(child_uuid);
240 }
241
242 void ChannelNode::AddChildSubchannel(intptr_t child_uuid) {
243   MutexLock lock(&child_mu_);
244   child_subchannels_.insert(child_uuid);
245 }
246
247 void ChannelNode::RemoveChildSubchannel(intptr_t child_uuid) {
248   MutexLock lock(&child_mu_);
249   child_subchannels_.erase(child_uuid);
250 }
251
252 //
253 // ServerNode
254 //
255
256 ServerNode::ServerNode(size_t channel_tracer_max_nodes)
257     : BaseNode(EntityType::kServer, ""), trace_(channel_tracer_max_nodes) {}
258
259 ServerNode::~ServerNode() {}
260
261 void ServerNode::AddChildSocket(RefCountedPtr<SocketNode> node) {
262   MutexLock lock(&child_mu_);
263   child_sockets_.insert(std::make_pair(node->uuid(), std::move(node)));
264 }
265
266 void ServerNode::RemoveChildSocket(intptr_t child_uuid) {
267   MutexLock lock(&child_mu_);
268   child_sockets_.erase(child_uuid);
269 }
270
271 void ServerNode::AddChildListenSocket(RefCountedPtr<ListenSocketNode> node) {
272   MutexLock lock(&child_mu_);
273   child_listen_sockets_.insert(std::make_pair(node->uuid(), std::move(node)));
274 }
275
276 void ServerNode::RemoveChildListenSocket(intptr_t child_uuid) {
277   MutexLock lock(&child_mu_);
278   child_listen_sockets_.erase(child_uuid);
279 }
280
281 std::string ServerNode::RenderServerSockets(intptr_t start_socket_id,
282                                             intptr_t max_results) {
283   GPR_ASSERT(start_socket_id >= 0);
284   GPR_ASSERT(max_results >= 0);
285   // If user does not set max_results, we choose 500.
286   size_t pagination_limit = max_results == 0 ? 500 : max_results;
287   Json::Object object;
288   {
289     MutexLock lock(&child_mu_);
290     size_t sockets_rendered = 0;
291     // Create list of socket refs.
292     Json::Array array;
293     auto it = child_sockets_.lower_bound(start_socket_id);
294     for (; it != child_sockets_.end() && sockets_rendered < pagination_limit;
295          ++it, ++sockets_rendered) {
296       array.emplace_back(Json::Object{
297           {"socketId", std::to_string(it->first)},
298           {"name", it->second->name()},
299       });
300     }
301     object["socketRef"] = std::move(array);
302     if (it == child_sockets_.end()) object["end"] = true;
303   }
304   Json json = std::move(object);
305   return json.Dump();
306 }
307
308 Json ServerNode::RenderJson() {
309   Json::Object data;
310   // Fill in the channel trace if applicable.
311   Json trace_json = trace_.RenderJson();
312   if (trace_json.type() != Json::Type::JSON_NULL) {
313     data["trace"] = std::move(trace_json);
314   }
315   // Ask CallCountingHelper to populate call count data.
316   call_counter_.PopulateCallCounts(&data);
317   // Construct top-level object.
318   Json::Object object = {
319       {"ref",
320        Json::Object{
321            {"serverId", std::to_string(uuid())},
322        }},
323       {"data", std::move(data)},
324   };
325   // Render listen sockets.
326   {
327     MutexLock lock(&child_mu_);
328     if (!child_listen_sockets_.empty()) {
329       Json::Array array;
330       for (const auto& it : child_listen_sockets_) {
331         array.emplace_back(Json::Object{
332             {"socketId", std::to_string(it.first)},
333             {"name", it.second->name()},
334         });
335       }
336       object["listenSocket"] = std::move(array);
337     }
338   }
339   return object;
340 }
341
342 //
343 // SocketNode::Security::Tls
344 //
345
346 Json SocketNode::Security::Tls::RenderJson() {
347   Json::Object data;
348   if (type == NameType::kStandardName) {
349     data["standard_name"] = name;
350   } else if (type == NameType::kOtherName) {
351     data["other_name"] = name;
352   }
353   if (!local_certificate.empty()) {
354     data["local_certificate"] = absl::Base64Escape(local_certificate);
355   }
356   if (!remote_certificate.empty()) {
357     data["remote_certificate"] = absl::Base64Escape(remote_certificate);
358   }
359   return data;
360 }
361
362 //
363 // SocketNode::Security
364 //
365
366 Json SocketNode::Security::RenderJson() {
367   Json::Object data;
368   switch (type) {
369     case ModelType::kUnset:
370       break;
371     case ModelType::kTls:
372       if (tls) {
373         data["tls"] = tls->RenderJson();
374       }
375       break;
376     case ModelType::kOther:
377       if (other) {
378         data["other"] = *other;
379       }
380       break;
381   }
382   return data;
383 }
384
385 namespace {
386
387 void* SecurityArgCopy(void* p) {
388   SocketNode::Security* xds_certificate_provider =
389       static_cast<SocketNode::Security*>(p);
390   return xds_certificate_provider->Ref().release();
391 }
392
393 void SecurityArgDestroy(void* p) {
394   SocketNode::Security* xds_certificate_provider =
395       static_cast<SocketNode::Security*>(p);
396   xds_certificate_provider->Unref();
397 }
398
399 int SecurityArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
400
401 const grpc_arg_pointer_vtable kChannelArgVtable = {
402     SecurityArgCopy, SecurityArgDestroy, SecurityArgCmp};
403
404 }  // namespace
405
406 grpc_arg SocketNode::Security::MakeChannelArg() const {
407   return grpc_channel_arg_pointer_create(
408       const_cast<char*>(GRPC_ARG_CHANNELZ_SECURITY),
409       const_cast<SocketNode::Security*>(this), &kChannelArgVtable);
410 }
411
412 RefCountedPtr<SocketNode::Security> SocketNode::Security::GetFromChannelArgs(
413     const grpc_channel_args* args) {
414   Security* security = grpc_channel_args_find_pointer<Security>(
415       args, GRPC_ARG_CHANNELZ_SECURITY);
416   return security != nullptr ? security->Ref() : nullptr;
417 }
418
419 //
420 // SocketNode
421 //
422
423 namespace {
424
425 void PopulateSocketAddressJson(Json::Object* json, const char* name,
426                                const char* addr_str) {
427   if (addr_str == nullptr) return;
428   Json::Object data;
429   absl::StatusOr<URI> uri = URI::Parse(addr_str);
430   if (uri.ok() && (uri->scheme() == "ipv4" || uri->scheme() == "ipv6")) {
431     std::string host;
432     std::string port;
433     GPR_ASSERT(
434         SplitHostPort(absl::StripPrefix(uri->path(), "/"), &host, &port));
435     int port_num = -1;
436     if (!port.empty()) {
437       port_num = atoi(port.data());
438     }
439     grpc_resolved_address resolved_host;
440     grpc_string_to_sockaddr(&resolved_host, host.c_str(), port_num);
441     std::string packed_host = grpc_sockaddr_get_packed_host(&resolved_host);
442     std::string b64_host = absl::Base64Escape(packed_host);
443     data["tcpip_address"] = Json::Object{
444         {"port", port_num},
445         {"ip_address", b64_host},
446     };
447   } else if (uri.ok() && uri->scheme() == "unix") {
448     data["uds_address"] = Json::Object{
449         {"filename", uri->path()},
450     };
451   } else {
452     data["other_address"] = Json::Object{
453         {"name", addr_str},
454     };
455   }
456   (*json)[name] = std::move(data);
457 }
458
459 }  // namespace
460
461 SocketNode::SocketNode(std::string local, std::string remote, std::string name,
462                        RefCountedPtr<Security> security)
463     : BaseNode(EntityType::kSocket, std::move(name)),
464       local_(std::move(local)),
465       remote_(std::move(remote)),
466       security_(std::move(security)) {}
467
468 void SocketNode::RecordStreamStartedFromLocal() {
469   streams_started_.FetchAdd(1, MemoryOrder::RELAXED);
470   last_local_stream_created_cycle_.Store(gpr_get_cycle_counter(),
471                                          MemoryOrder::RELAXED);
472 }
473
474 void SocketNode::RecordStreamStartedFromRemote() {
475   streams_started_.FetchAdd(1, MemoryOrder::RELAXED);
476   last_remote_stream_created_cycle_.Store(gpr_get_cycle_counter(),
477                                           MemoryOrder::RELAXED);
478 }
479
480 void SocketNode::RecordMessagesSent(uint32_t num_sent) {
481   messages_sent_.FetchAdd(num_sent, MemoryOrder::RELAXED);
482   last_message_sent_cycle_.Store(gpr_get_cycle_counter(), MemoryOrder::RELAXED);
483 }
484
485 void SocketNode::RecordMessageReceived() {
486   messages_received_.FetchAdd(1, MemoryOrder::RELAXED);
487   last_message_received_cycle_.Store(gpr_get_cycle_counter(),
488                                      MemoryOrder::RELAXED);
489 }
490
491 Json SocketNode::RenderJson() {
492   // Create and fill the data child.
493   Json::Object data;
494   gpr_timespec ts;
495   int64_t streams_started = streams_started_.Load(MemoryOrder::RELAXED);
496   if (streams_started != 0) {
497     data["streamsStarted"] = std::to_string(streams_started);
498     gpr_cycle_counter last_local_stream_created_cycle =
499         last_local_stream_created_cycle_.Load(MemoryOrder::RELAXED);
500     if (last_local_stream_created_cycle != 0) {
501       ts = gpr_convert_clock_type(
502           gpr_cycle_counter_to_time(last_local_stream_created_cycle),
503           GPR_CLOCK_REALTIME);
504       data["lastLocalStreamCreatedTimestamp"] = gpr_format_timespec(ts);
505     }
506     gpr_cycle_counter last_remote_stream_created_cycle =
507         last_remote_stream_created_cycle_.Load(MemoryOrder::RELAXED);
508     if (last_remote_stream_created_cycle != 0) {
509       ts = gpr_convert_clock_type(
510           gpr_cycle_counter_to_time(last_remote_stream_created_cycle),
511           GPR_CLOCK_REALTIME);
512       data["lastRemoteStreamCreatedTimestamp"] = gpr_format_timespec(ts);
513     }
514   }
515   int64_t streams_succeeded = streams_succeeded_.Load(MemoryOrder::RELAXED);
516   if (streams_succeeded != 0) {
517     data["streamsSucceeded"] = std::to_string(streams_succeeded);
518   }
519   int64_t streams_failed = streams_failed_.Load(MemoryOrder::RELAXED);
520   if (streams_failed != 0) {
521     data["streamsFailed"] = std::to_string(streams_failed);
522   }
523   int64_t messages_sent = messages_sent_.Load(MemoryOrder::RELAXED);
524   if (messages_sent != 0) {
525     data["messagesSent"] = std::to_string(messages_sent);
526     ts = gpr_convert_clock_type(
527         gpr_cycle_counter_to_time(
528             last_message_sent_cycle_.Load(MemoryOrder::RELAXED)),
529         GPR_CLOCK_REALTIME);
530     data["lastMessageSentTimestamp"] = gpr_format_timespec(ts);
531   }
532   int64_t messages_received = messages_received_.Load(MemoryOrder::RELAXED);
533   if (messages_received != 0) {
534     data["messagesReceived"] = std::to_string(messages_received);
535     ts = gpr_convert_clock_type(
536         gpr_cycle_counter_to_time(
537             last_message_received_cycle_.Load(MemoryOrder::RELAXED)),
538         GPR_CLOCK_REALTIME);
539     data["lastMessageReceivedTimestamp"] = gpr_format_timespec(ts);
540   }
541   int64_t keepalives_sent = keepalives_sent_.Load(MemoryOrder::RELAXED);
542   if (keepalives_sent != 0) {
543     data["keepAlivesSent"] = std::to_string(keepalives_sent);
544   }
545   // Create and fill the parent object.
546   Json::Object object = {
547       {"ref",
548        Json::Object{
549            {"socketId", std::to_string(uuid())},
550            {"name", name()},
551        }},
552       {"data", std::move(data)},
553   };
554   if (security_ != nullptr &&
555       security_->type != SocketNode::Security::ModelType::kUnset) {
556     object["security"] = security_->RenderJson();
557   }
558   PopulateSocketAddressJson(&object, "remote", remote_.c_str());
559   PopulateSocketAddressJson(&object, "local", local_.c_str());
560   return object;
561 }
562
563 //
564 // ListenSocketNode
565 //
566
567 ListenSocketNode::ListenSocketNode(std::string local_addr, std::string name)
568     : BaseNode(EntityType::kSocket, std::move(name)),
569       local_addr_(std::move(local_addr)) {}
570
571 Json ListenSocketNode::RenderJson() {
572   Json::Object object = {
573       {"ref",
574        Json::Object{
575            {"socketId", std::to_string(uuid())},
576            {"name", name()},
577        }},
578   };
579   PopulateSocketAddressJson(&object, "local", local_addr_.c_str());
580   return object;
581 }
582
583 }  // namespace channelz
584 }  // namespace grpc_core