3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 #include "absl/flags/flag.h"
29 #include "absl/strings/str_format.h"
30 #include "absl/strings/str_join.h"
31 #include "google/protobuf/text_format.h"
32 #include "grpc/grpc.h"
33 #include "grpc/support/port_platform.h"
34 #include "grpcpp/channel.h"
35 #include "grpcpp/client_context.h"
36 #include "grpcpp/create_channel.h"
37 #include "grpcpp/ext/channelz_service_plugin.h"
38 #include "grpcpp/grpcpp.h"
39 #include "grpcpp/security/credentials.h"
40 #include "grpcpp/security/server_credentials.h"
41 #include "grpcpp/server.h"
42 #include "grpcpp/server_builder.h"
43 #include "grpcpp/server_context.h"
44 #include "src/core/lib/json/json.h"
45 #include "src/cpp/server/channelz/channelz_service.h"
46 #include "src/proto/grpc/channelz/channelz.pb.h"
47 #include "test/core/util/test_config.h"
48 #include "test/cpp/util/test_config.h"
49 #include "test/cpp/util/test_credentials_provider.h"
51 ABSL_FLAG(std::string, server_address, "", "channelz server address");
52 ABSL_FLAG(std::string, custom_credentials_type, "", "custom credentials type");
53 ABSL_FLAG(int64_t, sampling_times, 1, "number of sampling");
54 // TODO(Capstan): Consider using absl::Duration
55 ABSL_FLAG(int64_t, sampling_interval_seconds, 0,
56 "sampling interval in seconds");
57 ABSL_FLAG(std::string, output_json, "", "output filename in json format");
60 using grpc::ClientContext;
62 using grpc::StatusCode;
63 using grpc::channelz::v1::GetChannelRequest;
64 using grpc::channelz::v1::GetChannelResponse;
65 using grpc::channelz::v1::GetServersRequest;
66 using grpc::channelz::v1::GetServersResponse;
67 using grpc::channelz::v1::GetSocketRequest;
68 using grpc::channelz::v1::GetSocketResponse;
69 using grpc::channelz::v1::GetSubchannelRequest;
70 using grpc::channelz::v1::GetSubchannelResponse;
71 using grpc::channelz::v1::GetTopChannelsRequest;
72 using grpc::channelz::v1::GetTopChannelsResponse;
75 class ChannelzSampler final {
77 // Get server_id of a server
78 int64_t GetServerID(const grpc::channelz::v1::Server& server) {
79 return server.ref().server_id();
82 // Get channel_id of a channel
83 inline int64_t GetChannelID(const grpc::channelz::v1::Channel& channel) {
84 return channel.ref().channel_id();
87 // Get subchannel_id of a subchannel
88 inline int64_t GetSubchannelID(
89 const grpc::channelz::v1::Subchannel& subchannel) {
90 return subchannel.ref().subchannel_id();
93 // Get socket_id of a socket
94 inline int64_t GetSocketID(const grpc::channelz::v1::Socket& socket) {
95 return socket.ref().socket_id();
98 // Get name of a server
99 inline std::string GetServerName(const grpc::channelz::v1::Server& server) {
100 return server.ref().name();
103 // Get name of a channel
104 inline std::string GetChannelName(
105 const grpc::channelz::v1::Channel& channel) {
106 return channel.ref().name();
109 // Get name of a subchannel
110 inline std::string GetSubchannelName(
111 const grpc::channelz::v1::Subchannel& subchannel) {
112 return subchannel.ref().name();
115 // Get name of a socket
116 inline std::string GetSocketName(const grpc::channelz::v1::Socket& socket) {
117 return socket.ref().name();
120 // Get a channel based on channel_id
121 grpc::channelz::v1::Channel GetChannelRPC(int64_t channel_id) {
122 GetChannelRequest get_channel_request;
123 get_channel_request.set_channel_id(channel_id);
124 GetChannelResponse get_channel_response;
125 ClientContext get_channel_context;
126 get_channel_context.set_deadline(
127 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
128 Status status = channelz_stub_->GetChannel(
129 &get_channel_context, get_channel_request, &get_channel_response);
131 gpr_log(GPR_ERROR, "GetChannelRPC failed: %s",
132 get_channel_context.debug_error_string().c_str());
135 return get_channel_response.channel();
138 // Get a subchannel based on subchannel_id
139 grpc::channelz::v1::Subchannel GetSubchannelRPC(int64_t subchannel_id) {
140 GetSubchannelRequest get_subchannel_request;
141 get_subchannel_request.set_subchannel_id(subchannel_id);
142 GetSubchannelResponse get_subchannel_response;
143 ClientContext get_subchannel_context;
144 get_subchannel_context.set_deadline(
145 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
146 Status status = channelz_stub_->GetSubchannel(&get_subchannel_context,
147 get_subchannel_request,
148 &get_subchannel_response);
150 gpr_log(GPR_ERROR, "GetSubchannelRPC failed: %s",
151 get_subchannel_context.debug_error_string().c_str());
154 return get_subchannel_response.subchannel();
157 // get a socket based on socket_id
158 grpc::channelz::v1::Socket GetSocketRPC(int64_t socket_id) {
159 GetSocketRequest get_socket_request;
160 get_socket_request.set_socket_id(socket_id);
161 GetSocketResponse get_socket_response;
162 ClientContext get_socket_context;
163 get_socket_context.set_deadline(
164 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
165 Status status = channelz_stub_->GetSocket(
166 &get_socket_context, get_socket_request, &get_socket_response);
168 gpr_log(GPR_ERROR, "GetSocketRPC failed: %s",
169 get_socket_context.debug_error_string().c_str());
172 return get_socket_response.socket();
175 // get the descedent channels/subchannels/sockets of a channel
176 // push descedent channels/subchannels to queue for layer traverse
177 // store descedent channels/subchannels/sockets for dumping data
178 void GetChannelDescedence(
179 const grpc::channelz::v1::Channel& channel,
180 std::queue<grpc::channelz::v1::Channel>& channel_queue,
181 std::queue<grpc::channelz::v1::Subchannel>& subchannel_queue) {
182 std::cout << " Channel ID" << GetChannelID(channel) << "_"
183 << GetChannelName(channel) << " descendence - ";
184 if (channel.channel_ref_size() > 0 || channel.subchannel_ref_size() > 0) {
185 if (channel.channel_ref_size() > 0) {
186 std::cout << "channel: ";
187 for (const auto& _channelref : channel.channel_ref()) {
188 int64_t ch_id = _channelref.channel_id();
189 std::cout << "ID" << ch_id << "_" << _channelref.name() << " ";
190 grpc::channelz::v1::Channel ch = GetChannelRPC(ch_id);
191 channel_queue.push(ch);
192 if (CheckID(ch_id)) {
193 all_channels_.push_back(ch);
194 StoreChannelInJson(ch);
197 if (channel.subchannel_ref_size() > 0) {
201 if (channel.subchannel_ref_size() > 0) {
202 std::cout << "subchannel: ";
203 for (const auto& _subchannelref : channel.subchannel_ref()) {
204 int64_t subch_id = _subchannelref.subchannel_id();
205 std::cout << "ID" << subch_id << "_" << _subchannelref.name() << " ";
206 grpc::channelz::v1::Subchannel subch = GetSubchannelRPC(subch_id);
207 subchannel_queue.push(subch);
208 if (CheckID(subch_id)) {
209 all_subchannels_.push_back(subch);
210 StoreSubchannelInJson(subch);
214 } else if (channel.socket_ref_size() > 0) {
215 std::cout << "socket: ";
216 for (const auto& _socketref : channel.socket_ref()) {
217 int64_t so_id = _socketref.socket_id();
218 std::cout << "ID" << so_id << "_" << _socketref.name() << " ";
219 grpc::channelz::v1::Socket so = GetSocketRPC(so_id);
220 if (CheckID(so_id)) {
221 all_sockets_.push_back(so);
222 StoreSocketInJson(so);
226 std::cout << std::endl;
229 // get the descedent channels/subchannels/sockets of a subchannel
230 // push descedent channels/subchannels to queue for layer traverse
231 // store descedent channels/subchannels/sockets for dumping data
232 void GetSubchannelDescedence(
233 grpc::channelz::v1::Subchannel& subchannel,
234 std::queue<grpc::channelz::v1::Channel>& channel_queue,
235 std::queue<grpc::channelz::v1::Subchannel>& subchannel_queue) {
236 std::cout << " Subchannel ID" << GetSubchannelID(subchannel) << "_"
237 << GetSubchannelName(subchannel) << " descendence - ";
238 if (subchannel.channel_ref_size() > 0 ||
239 subchannel.subchannel_ref_size() > 0) {
240 if (subchannel.channel_ref_size() > 0) {
241 std::cout << "channel: ";
242 for (const auto& _channelref : subchannel.channel_ref()) {
243 int64_t ch_id = _channelref.channel_id();
244 std::cout << "ID" << ch_id << "_" << _channelref.name() << " ";
245 grpc::channelz::v1::Channel ch = GetChannelRPC(ch_id);
246 channel_queue.push(ch);
247 if (CheckID(ch_id)) {
248 all_channels_.push_back(ch);
249 StoreChannelInJson(ch);
252 if (subchannel.subchannel_ref_size() > 0) {
256 if (subchannel.subchannel_ref_size() > 0) {
257 std::cout << "subchannel: ";
258 for (const auto& _subchannelref : subchannel.subchannel_ref()) {
259 int64_t subch_id = _subchannelref.subchannel_id();
260 std::cout << "ID" << subch_id << "_" << _subchannelref.name() << " ";
261 grpc::channelz::v1::Subchannel subch = GetSubchannelRPC(subch_id);
262 subchannel_queue.push(subch);
263 if (CheckID(subch_id)) {
264 all_subchannels_.push_back(subch);
265 StoreSubchannelInJson(subch);
269 } else if (subchannel.socket_ref_size() > 0) {
270 std::cout << "socket: ";
271 for (const auto& _socketref : subchannel.socket_ref()) {
272 int64_t so_id = _socketref.socket_id();
273 std::cout << "ID" << so_id << "_" << _socketref.name() << " ";
274 grpc::channelz::v1::Socket so = GetSocketRPC(so_id);
275 if (CheckID(so_id)) {
276 all_sockets_.push_back(so);
277 StoreSocketInJson(so);
281 std::cout << std::endl;
284 // Set up the channelz sampler client
285 // Initialize json as an array
286 void Setup(const std::string& custom_credentials_type,
287 const std::string& server_address) {
288 json_ = grpc_core::Json::Array();
289 rpc_timeout_seconds_ = 20;
290 grpc::ChannelArguments channel_args;
291 std::shared_ptr<grpc::ChannelCredentials> channel_creds =
292 grpc::testing::GetCredentialsProvider()->GetChannelCredentials(
293 custom_credentials_type, &channel_args);
294 if (!channel_creds) {
296 "Wrong user credential type: %s. Allowed credential types: "
297 "INSECURE_CREDENTIALS, ssl, alts, google_default_credentials.",
298 custom_credentials_type.c_str());
301 std::shared_ptr<grpc::Channel> channel =
302 CreateChannel(server_address, channel_creds);
303 channelz_stub_ = grpc::channelz::v1::Channelz::NewStub(channel);
306 // Get all servers, keep querying until getting all
307 // Store servers for dumping data
308 // Need to check id repeating for servers
309 void GetServersRPC() {
310 int64_t server_start_id = 0;
312 GetServersRequest get_servers_request;
313 GetServersResponse get_servers_response;
314 ClientContext get_servers_context;
315 get_servers_context.set_deadline(
316 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
317 get_servers_request.set_start_server_id(server_start_id);
318 Status status = channelz_stub_->GetServers(
319 &get_servers_context, get_servers_request, &get_servers_response);
321 if (status.error_code() == StatusCode::UNIMPLEMENTED) {
323 "Error status UNIMPLEMENTED. Please check and make sure "
324 "channelz has been registered on the server being queried.");
327 "GetServers RPC with GetServersRequest.server_start_id=%d, "
329 int(server_start_id),
330 get_servers_context.debug_error_string().c_str());
334 for (const auto& _server : get_servers_response.server()) {
335 all_servers_.push_back(_server);
336 StoreServerInJson(_server);
338 if (!get_servers_response.end()) {
339 server_start_id = GetServerID(all_servers_.back()) + 1;
344 std::cout << "Number of servers = " << all_servers_.size() << std::endl;
347 // Get sockets that belongs to servers
348 // Store sockets for dumping data
349 void GetSocketsOfServers() {
350 for (const auto& _server : all_servers_) {
351 std::cout << "Server ID" << GetServerID(_server) << "_"
352 << GetServerName(_server) << " listen_socket - ";
353 for (const auto& _socket : _server.listen_socket()) {
354 int64_t so_id = _socket.socket_id();
355 std::cout << "ID" << so_id << "_" << _socket.name() << " ";
356 if (CheckID(so_id)) {
357 grpc::channelz::v1::Socket so = GetSocketRPC(so_id);
358 all_sockets_.push_back(so);
359 StoreSocketInJson(so);
362 std::cout << std::endl;
366 // Get all top channels, keep querying until getting all
367 // Store channels for dumping data
368 // No need to check id repeating for top channels
369 void GetTopChannelsRPC() {
370 int64_t channel_start_id = 0;
372 GetTopChannelsRequest get_top_channels_request;
373 GetTopChannelsResponse get_top_channels_response;
374 ClientContext get_top_channels_context;
375 get_top_channels_context.set_deadline(
376 grpc_timeout_seconds_to_deadline(rpc_timeout_seconds_));
377 get_top_channels_request.set_start_channel_id(channel_start_id);
378 Status status = channelz_stub_->GetTopChannels(
379 &get_top_channels_context, get_top_channels_request,
380 &get_top_channels_response);
383 "GetTopChannels RPC with "
384 "GetTopChannelsRequest.channel_start_id=%d failed: %s",
385 int(channel_start_id),
386 get_top_channels_context.debug_error_string().c_str());
389 for (const auto& _topchannel : get_top_channels_response.channel()) {
390 top_channels_.push_back(_topchannel);
391 all_channels_.push_back(_topchannel);
392 StoreChannelInJson(_topchannel);
394 if (!get_top_channels_response.end()) {
395 channel_start_id = GetChannelID(top_channels_.back()) + 1;
400 std::cout << std::endl
401 << "Number of top channels = " << top_channels_.size()
405 // layer traverse for each top channel
406 void TraverseTopChannels() {
407 for (const auto& _topchannel : top_channels_) {
409 std::queue<grpc::channelz::v1::Channel> channel_queue;
410 std::queue<grpc::channelz::v1::Subchannel> subchannel_queue;
411 std::cout << "Tree depth = " << tree_depth << std::endl;
412 GetChannelDescedence(_topchannel, channel_queue, subchannel_queue);
413 while (!channel_queue.empty() || !subchannel_queue.empty()) {
415 std::cout << "Tree depth = " << tree_depth << std::endl;
416 int ch_q_size = channel_queue.size();
417 int subch_q_size = subchannel_queue.size();
418 for (int i = 0; i < ch_q_size; ++i) {
419 grpc::channelz::v1::Channel ch = channel_queue.front();
421 GetChannelDescedence(ch, channel_queue, subchannel_queue);
423 for (int i = 0; i < subch_q_size; ++i) {
424 grpc::channelz::v1::Subchannel subch = subchannel_queue.front();
425 subchannel_queue.pop();
426 GetSubchannelDescedence(subch, channel_queue, subchannel_queue);
429 std::cout << std::endl;
433 // dump data of all entities to stdout
435 std::string data_str;
436 for (const auto& _channel : all_channels_) {
437 std::cout << "channel ID" << GetChannelID(_channel) << "_"
438 << GetChannelName(_channel) << " data:" << std::endl;
439 // TODO(mohanli): TextFormat::PrintToString records time as seconds and
440 // nanos. Need a more human readable way.
441 ::google::protobuf::TextFormat::PrintToString(_channel.data(), &data_str);
442 printf("%s\n", data_str.c_str());
444 for (const auto& _subchannel : all_subchannels_) {
445 std::cout << "subchannel ID" << GetSubchannelID(_subchannel) << "_"
446 << GetSubchannelName(_subchannel) << " data:" << std::endl;
447 ::google::protobuf::TextFormat::PrintToString(_subchannel.data(),
449 printf("%s\n", data_str.c_str());
451 for (const auto& _server : all_servers_) {
452 std::cout << "server ID" << GetServerID(_server) << "_"
453 << GetServerName(_server) << " data:" << std::endl;
454 ::google::protobuf::TextFormat::PrintToString(_server.data(), &data_str);
455 printf("%s\n", data_str.c_str());
457 for (const auto& _socket : all_sockets_) {
458 std::cout << "socket ID" << GetSocketID(_socket) << "_"
459 << GetSocketName(_socket) << " data:" << std::endl;
460 ::google::protobuf::TextFormat::PrintToString(_socket.data(), &data_str);
461 printf("%s\n", data_str.c_str());
465 // Store a channel in Json
466 void StoreChannelInJson(const grpc::channelz::v1::Channel& channel) {
467 std::string id = grpc::to_string(GetChannelID(channel));
468 std::string type = "Channel";
469 std::string description;
470 ::google::protobuf::TextFormat::PrintToString(channel.data(), &description);
471 grpc_core::Json description_json = grpc_core::Json(description);
472 StoreEntityInJson(id, type, description_json);
475 // Store a subchannel in Json
476 void StoreSubchannelInJson(const grpc::channelz::v1::Subchannel& subchannel) {
477 std::string id = grpc::to_string(GetSubchannelID(subchannel));
478 std::string type = "Subchannel";
479 std::string description;
480 ::google::protobuf::TextFormat::PrintToString(subchannel.data(),
482 grpc_core::Json description_json = grpc_core::Json(description);
483 StoreEntityInJson(id, type, description_json);
486 // Store a server in Json
487 void StoreServerInJson(const grpc::channelz::v1::Server& server) {
488 std::string id = grpc::to_string(GetServerID(server));
489 std::string type = "Server";
490 std::string description;
491 ::google::protobuf::TextFormat::PrintToString(server.data(), &description);
492 grpc_core::Json description_json = grpc_core::Json(description);
493 StoreEntityInJson(id, type, description_json);
496 // Store a socket in Json
497 void StoreSocketInJson(const grpc::channelz::v1::Socket& socket) {
498 std::string id = grpc::to_string(GetSocketID(socket));
499 std::string type = "Socket";
500 std::string description;
501 ::google::protobuf::TextFormat::PrintToString(socket.data(), &description);
502 grpc_core::Json description_json = grpc_core::Json(description);
503 StoreEntityInJson(id, type, description_json);
506 // Store an entity in Json
507 void StoreEntityInJson(std::string& id, std::string& type,
508 const grpc_core::Json& description) {
509 std::string start, finish;
510 gpr_timespec ago = gpr_time_sub(
512 gpr_time_from_seconds(absl::GetFlag(FLAGS_sampling_interval_seconds),
514 std::stringstream ss;
515 const time_t time_now = now_.tv_sec;
516 ss << std::put_time(std::localtime(&time_now), "%F %T");
517 finish = ss.str(); // example: "2019-02-01 12:12:18"
519 const time_t time_ago = ago.tv_sec;
520 ss << std::put_time(std::localtime(&time_ago), "%F %T");
522 grpc_core::Json obj =
523 grpc_core::Json::Object{{"Task", absl::StrFormat("%s_ID%s", type, id)},
528 {"Description", description}};
529 json_.mutable_array()->push_back(obj);
533 std::string DumpJson() { return json_.Dump(); }
535 // Check if one entity has been recorded
536 bool CheckID(int64_t id) {
537 if (id_set_.count(id) == 0) {
545 // Record current time
546 void RecordNow() { now_ = gpr_now(GPR_CLOCK_REALTIME); }
549 std::unique_ptr<grpc::channelz::v1::Channelz::Stub> channelz_stub_;
550 std::vector<grpc::channelz::v1::Channel> top_channels_;
551 std::vector<grpc::channelz::v1::Server> all_servers_;
552 std::vector<grpc::channelz::v1::Channel> all_channels_;
553 std::vector<grpc::channelz::v1::Subchannel> all_subchannels_;
554 std::vector<grpc::channelz::v1::Socket> all_sockets_;
555 std::unordered_set<int64_t> id_set_;
556 grpc_core::Json json_;
557 int64_t rpc_timeout_seconds_;
561 int main(int argc, char** argv) {
562 grpc::testing::TestEnvironment env(argc, argv);
563 grpc::testing::InitTest(&argc, &argv, true);
564 std::ofstream output_file(absl::GetFlag(FLAGS_output_json));
565 for (int i = 0; i < absl::GetFlag(FLAGS_sampling_times); ++i) {
566 ChannelzSampler channelz_sampler;
567 channelz_sampler.Setup(absl::GetFlag(FLAGS_custom_credentials_type),
568 absl::GetFlag(FLAGS_server_address));
569 std::cout << "Wait for sampling interval "
570 << absl::GetFlag(FLAGS_sampling_interval_seconds) << "s..."
572 const gpr_timespec kDelay = gpr_time_add(
573 gpr_now(GPR_CLOCK_MONOTONIC),
574 gpr_time_from_seconds(absl::GetFlag(FLAGS_sampling_interval_seconds),
576 gpr_sleep_until(kDelay);
577 std::cout << "##### " << i << "th sampling #####" << std::endl;
578 channelz_sampler.RecordNow();
579 channelz_sampler.GetServersRPC();
580 channelz_sampler.GetSocketsOfServers();
581 channelz_sampler.GetTopChannelsRPC();
582 channelz_sampler.TraverseTopChannels();
583 channelz_sampler.DumpStdout();
584 if (!absl::GetFlag(FLAGS_output_json).empty()) {
585 output_file << channelz_sampler.DumpJson() << "\n" << std::flush;