3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/str_join.h"
30 #include "gflags/gflags.h"
31 #include "google/protobuf/text_format.h"
32 #include "grpc/grpc.h"
33 #include "grpc/support/port_platform.h"
34 #include "grpcpp/channel.h"
35 #include "grpcpp/client_context.h"
36 #include "grpcpp/create_channel.h"
37 #include "grpcpp/ext/channelz_service_plugin.h"
38 #include "grpcpp/grpcpp.h"
39 #include "grpcpp/security/credentials.h"
40 #include "grpcpp/security/server_credentials.h"
41 #include "grpcpp/server.h"
42 #include "grpcpp/server_builder.h"
43 #include "grpcpp/server_context.h"
44 #include "src/core/lib/json/json.h"
45 #include "src/cpp/server/channelz/channelz_service.h"
46 #include "src/proto/grpc/channelz/channelz.pb.h"
47 #include "test/core/util/test_config.h"
48 #include "test/cpp/util/test_config.h"
49 #include "test/cpp/util/test_credentials_provider.h"
51 DEFINE_string(server_address, "", "channelz server address");
52 DEFINE_string(custom_credentials_type, "", "custom credentials type");
53 DEFINE_int64(sampling_times, 1, "number of sampling");
54 DEFINE_int64(sampling_interval_seconds, 0, "sampling interval in seconds");
55 DEFINE_string(output_json, "", "output filename in json format");
58 using grpc::ClientContext;
60 using grpc::StatusCode;
61 using grpc::channelz::v1::GetChannelRequest;
62 using grpc::channelz::v1::GetChannelResponse;
63 using grpc::channelz::v1::GetServerRequest;
64 using grpc::channelz::v1::GetServerResponse;
65 using grpc::channelz::v1::GetServerSocketsRequest;
66 using grpc::channelz::v1::GetServerSocketsResponse;
67 using grpc::channelz::v1::GetServersRequest;
68 using grpc::channelz::v1::GetServersResponse;
69 using grpc::channelz::v1::GetSocketRequest;
70 using grpc::channelz::v1::GetSocketResponse;
71 using grpc::channelz::v1::GetSubchannelRequest;
72 using grpc::channelz::v1::GetSubchannelResponse;
73 using grpc::channelz::v1::GetTopChannelsRequest;
74 using grpc::channelz::v1::GetTopChannelsResponse;
77 class ChannelzSampler final {
79 // Get server_id of a server
80 int64_t GetServerID(const grpc::channelz::v1::Server& server) {
81 return server.ref().server_id();
84 // Get channel_id of a channel
85 inline int64_t GetChannelID(const grpc::channelz::v1::Channel& channel) {
86 return channel.ref().channel_id();
89 // Get subchannel_id of a subchannel
90 inline int64_t GetSubchannelID(
91 const grpc::channelz::v1::Subchannel& subchannel) {
92 return subchannel.ref().subchannel_id();
95 // Get socket_id of a socket
96 inline int64_t GetSocketID(const grpc::channelz::v1::Socket& socket) {
97 return socket.ref().socket_id();
100 // Get name of a server
101 inline std::string GetServerName(const grpc::channelz::v1::Server& server) {
102 return server.ref().name();
105 // Get name of a channel
106 inline std::string GetChannelName(
107 const grpc::channelz::v1::Channel& channel) {
108 return channel.ref().name();
111 // Get name of a subchannel
112 inline std::string GetSubchannelName(
113 const grpc::channelz::v1::Subchannel& subchannel) {
114 return subchannel.ref().name();
117 // Get name of a socket
118 inline std::string GetSocketName(const grpc::channelz::v1::Socket& socket) {
119 return socket.ref().name();
122 // Get a channel based on channel_id
123 grpc::channelz::v1::Channel GetChannelRPC(int64_t channel_id) {
124 GetChannelRequest get_channel_request;
125 get_channel_request.set_channel_id(channel_id);
126 GetChannelResponse get_channel_response;
127 ClientContext get_channel_context;
128 get_channel_context.set_deadline(
129 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
130 Status status = channelz_stub_->GetChannel(
131 &get_channel_context, get_channel_request, &get_channel_response);
133 gpr_log(GPR_ERROR, "GetChannelRPC failed: %s",
134 get_channel_context.debug_error_string().c_str());
137 return get_channel_response.channel();
140 // Get a subchannel based on subchannel_id
141 grpc::channelz::v1::Subchannel GetSubchannelRPC(int64_t subchannel_id) {
142 GetSubchannelRequest get_subchannel_request;
143 get_subchannel_request.set_subchannel_id(subchannel_id);
144 GetSubchannelResponse get_subchannel_response;
145 ClientContext get_subchannel_context;
146 get_subchannel_context.set_deadline(
147 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
148 Status status = channelz_stub_->GetSubchannel(&get_subchannel_context,
149 get_subchannel_request,
150 &get_subchannel_response);
152 gpr_log(GPR_ERROR, "GetSubchannelRPC failed: %s",
153 get_subchannel_context.debug_error_string().c_str());
156 return get_subchannel_response.subchannel();
159 // get a socket based on socket_id
160 grpc::channelz::v1::Socket GetSocketRPC(int64_t socket_id) {
161 GetSocketRequest get_socket_request;
162 get_socket_request.set_socket_id(socket_id);
163 GetSocketResponse get_socket_response;
164 ClientContext get_socket_context;
165 get_socket_context.set_deadline(
166 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
167 Status status = channelz_stub_->GetSocket(
168 &get_socket_context, get_socket_request, &get_socket_response);
170 gpr_log(GPR_ERROR, "GetSocketRPC failed: %s",
171 get_socket_context.debug_error_string().c_str());
174 return get_socket_response.socket();
177 // get the descedent channels/subchannels/sockets of a channel
178 // push descedent channels/subchannels to queue for layer traverse
179 // store descedent channels/subchannels/sockets for dumping data
180 void GetChannelDescedence(
181 const grpc::channelz::v1::Channel& channel,
182 std::queue<grpc::channelz::v1::Channel>& channel_queue,
183 std::queue<grpc::channelz::v1::Subchannel>& subchannel_queue) {
184 std::cout << " Channel ID" << GetChannelID(channel) << "_"
185 << GetChannelName(channel) << " descendence - ";
186 if (channel.channel_ref_size() > 0 || channel.subchannel_ref_size() > 0) {
187 if (channel.channel_ref_size() > 0) {
188 std::cout << "channel: ";
189 for (const auto& _channelref : channel.channel_ref()) {
190 int64_t ch_id = _channelref.channel_id();
191 std::cout << "ID" << ch_id << "_" << _channelref.name() << " ";
192 grpc::channelz::v1::Channel ch = GetChannelRPC(ch_id);
193 channel_queue.push(ch);
194 if (CheckID(ch_id)) {
195 all_channels_.push_back(ch);
196 StoreChannelInJson(ch);
199 if (channel.subchannel_ref_size() > 0) {
203 if (channel.subchannel_ref_size() > 0) {
204 std::cout << "subchannel: ";
205 for (const auto& _subchannelref : channel.subchannel_ref()) {
206 int64_t subch_id = _subchannelref.subchannel_id();
207 std::cout << "ID" << subch_id << "_" << _subchannelref.name() << " ";
208 grpc::channelz::v1::Subchannel subch = GetSubchannelRPC(subch_id);
209 subchannel_queue.push(subch);
210 if (CheckID(subch_id)) {
211 all_subchannels_.push_back(subch);
212 StoreSubchannelInJson(subch);
216 } else if (channel.socket_ref_size() > 0) {
217 std::cout << "socket: ";
218 for (const auto& _socketref : channel.socket_ref()) {
219 int64_t so_id = _socketref.socket_id();
220 std::cout << "ID" << so_id << "_" << _socketref.name() << " ";
221 grpc::channelz::v1::Socket so = GetSocketRPC(so_id);
222 if (CheckID(so_id)) {
223 all_sockets_.push_back(so);
224 StoreSocketInJson(so);
228 std::cout << std::endl;
231 // get the descedent channels/subchannels/sockets of a subchannel
232 // push descedent channels/subchannels to queue for layer traverse
233 // store descedent channels/subchannels/sockets for dumping data
234 void GetSubchannelDescedence(
235 grpc::channelz::v1::Subchannel& subchannel,
236 std::queue<grpc::channelz::v1::Channel>& channel_queue,
237 std::queue<grpc::channelz::v1::Subchannel>& subchannel_queue) {
238 std::cout << " Subchannel ID" << GetSubchannelID(subchannel) << "_"
239 << GetSubchannelName(subchannel) << " descendence - ";
240 if (subchannel.channel_ref_size() > 0 ||
241 subchannel.subchannel_ref_size() > 0) {
242 if (subchannel.channel_ref_size() > 0) {
243 std::cout << "channel: ";
244 for (const auto& _channelref : subchannel.channel_ref()) {
245 int64_t ch_id = _channelref.channel_id();
246 std::cout << "ID" << ch_id << "_" << _channelref.name() << " ";
247 grpc::channelz::v1::Channel ch = GetChannelRPC(ch_id);
248 channel_queue.push(ch);
249 if (CheckID(ch_id)) {
250 all_channels_.push_back(ch);
251 StoreChannelInJson(ch);
254 if (subchannel.subchannel_ref_size() > 0) {
258 if (subchannel.subchannel_ref_size() > 0) {
259 std::cout << "subchannel: ";
260 for (const auto& _subchannelref : subchannel.subchannel_ref()) {
261 int64_t subch_id = _subchannelref.subchannel_id();
262 std::cout << "ID" << subch_id << "_" << _subchannelref.name() << " ";
263 grpc::channelz::v1::Subchannel subch = GetSubchannelRPC(subch_id);
264 subchannel_queue.push(subch);
265 if (CheckID(subch_id)) {
266 all_subchannels_.push_back(subch);
267 StoreSubchannelInJson(subch);
271 } else if (subchannel.socket_ref_size() > 0) {
272 std::cout << "socket: ";
273 for (const auto& _socketref : subchannel.socket_ref()) {
274 int64_t so_id = _socketref.socket_id();
275 std::cout << "ID" << so_id << "_" << _socketref.name() << " ";
276 grpc::channelz::v1::Socket so = GetSocketRPC(so_id);
277 if (CheckID(so_id)) {
278 all_sockets_.push_back(so);
279 StoreSocketInJson(so);
283 std::cout << std::endl;
286 // Set up the channelz sampler client
287 // Initialize json as an array
288 void Setup(const std::string& custom_credentials_type,
289 const std::string& server_address) {
290 json_ = grpc_core::Json::Array();
291 rpc_timeout_seconds_ = 20;
292 grpc::ChannelArguments channel_args;
293 std::shared_ptr<grpc::ChannelCredentials> channel_creds =
294 grpc::testing::GetCredentialsProvider()->GetChannelCredentials(
295 custom_credentials_type, &channel_args);
296 if (!channel_creds) {
298 "Wrong user credential type: %s. Allowed credential types: "
299 "INSECURE_CREDENTIALS, ssl, alts, google_default_credentials.",
300 custom_credentials_type.c_str());
303 std::shared_ptr<grpc::Channel> channel =
304 CreateChannel(server_address, channel_creds);
305 channelz_stub_ = grpc::channelz::v1::Channelz::NewStub(channel);
308 // Get all servers, keep querying until getting all
309 // Store servers for dumping data
310 // Need to check id repeating for servers
311 void GetServersRPC() {
312 int64_t server_start_id = 0;
314 GetServersRequest get_servers_request;
315 GetServersResponse get_servers_response;
316 ClientContext get_servers_context;
317 get_servers_context.set_deadline(
318 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
319 get_servers_request.set_start_server_id(server_start_id);
320 Status status = channelz_stub_->GetServers(
321 &get_servers_context, get_servers_request, &get_servers_response);
323 if (status.error_code() == StatusCode::UNIMPLEMENTED) {
325 "Error status UNIMPLEMENTED. Please check and make sure "
326 "channelz has been registered on the server being queried.");
329 "GetServers RPC with GetServersRequest.server_start_id=%d, "
331 int(server_start_id),
332 get_servers_context.debug_error_string().c_str());
336 for (const auto& _server : get_servers_response.server()) {
337 all_servers_.push_back(_server);
338 StoreServerInJson(_server);
340 if (!get_servers_response.end()) {
341 server_start_id = GetServerID(all_servers_.back()) + 1;
346 std::cout << "Number of servers = " << all_servers_.size() << std::endl;
349 // Get sockets that belongs to servers
350 // Store sockets for dumping data
351 void GetSocketsOfServers() {
352 for (const auto& _server : all_servers_) {
353 std::cout << "Server ID" << GetServerID(_server) << "_"
354 << GetServerName(_server) << " listen_socket - ";
355 for (const auto& _socket : _server.listen_socket()) {
356 int64_t so_id = _socket.socket_id();
357 std::cout << "ID" << so_id << "_" << _socket.name() << " ";
358 if (CheckID(so_id)) {
359 grpc::channelz::v1::Socket so = GetSocketRPC(so_id);
360 all_sockets_.push_back(so);
361 StoreSocketInJson(so);
364 std::cout << std::endl;
368 // Get all top channels, keep querying until getting all
369 // Store channels for dumping data
370 // No need to check id repeating for top channels
371 void GetTopChannelsRPC() {
372 int64_t channel_start_id = 0;
374 GetTopChannelsRequest get_top_channels_request;
375 GetTopChannelsResponse get_top_channels_response;
376 ClientContext get_top_channels_context;
377 get_top_channels_context.set_deadline(
378 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
379 get_top_channels_request.set_start_channel_id(channel_start_id);
380 Status status = channelz_stub_->GetTopChannels(
381 &get_top_channels_context, get_top_channels_request,
382 &get_top_channels_response);
385 "GetTopChannels RPC with "
386 "GetTopChannelsRequest.channel_start_id=%d failed: %s",
387 int(channel_start_id),
388 get_top_channels_context.debug_error_string().c_str());
391 for (const auto& _topchannel : get_top_channels_response.channel()) {
392 top_channels_.push_back(_topchannel);
393 all_channels_.push_back(_topchannel);
394 StoreChannelInJson(_topchannel);
396 if (!get_top_channels_response.end()) {
397 channel_start_id = GetChannelID(top_channels_.back()) + 1;
402 std::cout << std::endl
403 << "Number of top channels = " << top_channels_.size()
407 // layer traverse for each top channel
408 void TraverseTopChannels() {
409 for (const auto& _topchannel : top_channels_) {
411 std::queue<grpc::channelz::v1::Channel> channel_queue;
412 std::queue<grpc::channelz::v1::Subchannel> subchannel_queue;
413 std::cout << "Tree depth = " << tree_depth << std::endl;
414 GetChannelDescedence(_topchannel, channel_queue, subchannel_queue);
415 while (!channel_queue.empty() || !subchannel_queue.empty()) {
417 std::cout << "Tree depth = " << tree_depth << std::endl;
418 int ch_q_size = channel_queue.size();
419 int subch_q_size = subchannel_queue.size();
420 for (int i = 0; i < ch_q_size; ++i) {
421 grpc::channelz::v1::Channel ch = channel_queue.front();
423 GetChannelDescedence(ch, channel_queue, subchannel_queue);
425 for (int i = 0; i < subch_q_size; ++i) {
426 grpc::channelz::v1::Subchannel subch = subchannel_queue.front();
427 subchannel_queue.pop();
428 GetSubchannelDescedence(subch, channel_queue, subchannel_queue);
431 std::cout << std::endl;
435 // dump data of all entities to stdout
437 std::string data_str;
438 for (const auto& _channel : all_channels_) {
439 std::cout << "channel ID" << GetChannelID(_channel) << "_"
440 << GetChannelName(_channel) << " data:" << std::endl;
441 // TODO(mohanli): TextFormat::PrintToString records time as seconds and
442 // nanos. Need a more human readable way.
443 ::google::protobuf::TextFormat::PrintToString(_channel.data(), &data_str);
444 printf("%s\n", data_str.c_str());
446 for (const auto& _subchannel : all_subchannels_) {
447 std::cout << "subchannel ID" << GetSubchannelID(_subchannel) << "_"
448 << GetSubchannelName(_subchannel) << " data:" << std::endl;
449 ::google::protobuf::TextFormat::PrintToString(_subchannel.data(),
451 printf("%s\n", data_str.c_str());
453 for (const auto& _server : all_servers_) {
454 std::cout << "server ID" << GetServerID(_server) << "_"
455 << GetServerName(_server) << " data:" << std::endl;
456 ::google::protobuf::TextFormat::PrintToString(_server.data(), &data_str);
457 printf("%s\n", data_str.c_str());
459 for (const auto& _socket : all_sockets_) {
460 std::cout << "socket ID" << GetSocketID(_socket) << "_"
461 << GetSocketName(_socket) << " data:" << std::endl;
462 ::google::protobuf::TextFormat::PrintToString(_socket.data(), &data_str);
463 printf("%s\n", data_str.c_str());
467 // Store a channel in Json
468 void StoreChannelInJson(const grpc::channelz::v1::Channel& channel) {
469 std::string id = grpc::to_string(GetChannelID(channel));
470 std::string type = "Channel";
471 std::string description;
472 ::google::protobuf::TextFormat::PrintToString(channel.data(), &description);
473 grpc_core::Json description_json = grpc_core::Json(description);
474 StoreEntityInJson(id, type, description_json);
477 // Store a subchannel in Json
478 void StoreSubchannelInJson(const grpc::channelz::v1::Subchannel& subchannel) {
479 std::string id = grpc::to_string(GetSubchannelID(subchannel));
480 std::string type = "Subchannel";
481 std::string description;
482 ::google::protobuf::TextFormat::PrintToString(subchannel.data(),
484 grpc_core::Json description_json = grpc_core::Json(description);
485 StoreEntityInJson(id, type, description_json);
488 // Store a server in Json
489 void StoreServerInJson(const grpc::channelz::v1::Server& server) {
490 std::string id = grpc::to_string(GetServerID(server));
491 std::string type = "Server";
492 std::string description;
493 ::google::protobuf::TextFormat::PrintToString(server.data(), &description);
494 grpc_core::Json description_json = grpc_core::Json(description);
495 StoreEntityInJson(id, type, description_json);
498 // Store a socket in Json
499 void StoreSocketInJson(const grpc::channelz::v1::Socket& socket) {
500 std::string id = grpc::to_string(GetSocketID(socket));
501 std::string type = "Socket";
502 std::string description;
503 ::google::protobuf::TextFormat::PrintToString(socket.data(), &description);
504 grpc_core::Json description_json = grpc_core::Json(description);
505 StoreEntityInJson(id, type, description_json);
508 // Store an entity in Json
509 void StoreEntityInJson(std::string& id, std::string& type,
510 const grpc_core::Json& description) {
511 std::string start, finish;
512 gpr_timespec ago = gpr_time_sub(
514 gpr_time_from_seconds(FLAGS_sampling_interval_seconds, GPR_TIMESPAN));
515 std::stringstream ss;
516 const time_t time_now = now_.tv_sec;
517 ss << std::put_time(std::localtime(&time_now), "%F %T");
518 finish = ss.str(); // example: "2019-02-01 12:12:18"
520 const time_t time_ago = ago.tv_sec;
521 ss << std::put_time(std::localtime(&time_ago), "%F %T");
523 grpc_core::Json obj =
524 grpc_core::Json::Object{{"Task", absl::StrFormat("%s_ID%s", type, id)},
529 {"Description", description}};
530 json_.mutable_array()->push_back(obj);
534 std::string DumpJson() { return json_.Dump(); }
536 // Check if one entity has been recorded
537 bool CheckID(int64_t id) {
538 if (id_set_.count(id) == 0) {
546 // Record current time
547 void RecordNow() { now_ = gpr_now(GPR_CLOCK_REALTIME); }
550 std::unique_ptr<grpc::channelz::v1::Channelz::Stub> channelz_stub_;
551 std::vector<grpc::channelz::v1::Channel> top_channels_;
552 std::vector<grpc::channelz::v1::Server> all_servers_;
553 std::vector<grpc::channelz::v1::Channel> all_channels_;
554 std::vector<grpc::channelz::v1::Subchannel> all_subchannels_;
555 std::vector<grpc::channelz::v1::Socket> all_sockets_;
556 std::unordered_set<int64_t> id_set_;
557 grpc_core::Json json_;
558 int64_t rpc_timeout_seconds_;
562 int main(int argc, char** argv) {
563 grpc::testing::TestEnvironment env(argc, argv);
564 grpc::testing::InitTest(&argc, &argv, true);
565 std::ofstream output_file(FLAGS_output_json);
566 for (int i = 0; i < FLAGS_sampling_times; ++i) {
567 ChannelzSampler channelz_sampler;
568 channelz_sampler.Setup(FLAGS_custom_credentials_type, FLAGS_server_address);
569 std::cout << "Wait for sampling interval "
570 << FLAGS_sampling_interval_seconds << "s..." << std::endl;
571 const gpr_timespec kDelay = gpr_time_add(
572 gpr_now(GPR_CLOCK_MONOTONIC),
573 gpr_time_from_seconds(FLAGS_sampling_interval_seconds, GPR_TIMESPAN));
574 gpr_sleep_until(kDelay);
575 std::cout << "##### " << i << "th sampling #####" << std::endl;
576 channelz_sampler.RecordNow();
577 channelz_sampler.GetServersRPC();
578 channelz_sampler.GetSocketsOfServers();
579 channelz_sampler.GetTopChannelsRPC();
580 channelz_sampler.TraverseTopChannels();
581 channelz_sampler.DumpStdout();
582 if (!FLAGS_output_json.empty()) {
583 output_file << channelz_sampler.DumpJson() << "\n" << std::flush;