362ea529259b18baecbf4b09031af72fdc9dcc4c
[platform/upstream/grpc.git] / src / cpp / ext / filters / census / client_filter.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string>
22 #include <utility>
23 #include <vector>
24
25 #include "src/cpp/ext/filters/census/client_filter.h"
26
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/string_view.h"
29 #include "opencensus/stats/stats.h"
30 #include "opencensus/tags/context_util.h"
31 #include "opencensus/tags/tag_key.h"
32 #include "opencensus/tags/tag_map.h"
33 #include "src/core/lib/surface/call.h"
34 #include "src/cpp/ext/filters/census/grpc_plugin.h"
35 #include "src/cpp/ext/filters/census/measures.h"
36
37 namespace grpc {
38
39 constexpr uint32_t
40     OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
41 constexpr uint32_t
42     OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
43
44 grpc_error_handle CensusClientCallData::Init(
45     grpc_call_element* /* elem */, const grpc_call_element_args* args) {
46   auto tracer = args->arena->New<OpenCensusCallTracer>(args);
47   GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
48   args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
49   args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
50     (static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
51   };
52   return GRPC_ERROR_NONE;
53 }
54
55 //
56 // OpenCensusCallTracer::OpenCensusCallAttemptTracer
57 //
58
59 namespace {
60
61 CensusContext CreateCensusContextForCallAttempt(
62     absl::string_view method, const CensusContext& parent_context) {
63   GPR_DEBUG_ASSERT(parent_context.Context().IsValid());
64   return CensusContext(absl::StrCat("Attempt.", method), &parent_context.Span(),
65                        parent_context.tags());
66 }
67
68 }  // namespace
69
70 OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
71     OpenCensusCallTracer* parent, uint64_t attempt_num,
72     bool is_transparent_retry, bool arena_allocated)
73     : parent_(parent),
74       arena_allocated_(arena_allocated),
75       context_(CreateCensusContextForCallAttempt(parent_->method_,
76                                                  parent_->context_)),
77       start_time_(absl::Now()) {
78   context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
79   context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
80   memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
81   memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
82 }
83
84 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
85     RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
86                               uint32_t /* flags */) {
87   size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
88                                              kMaxTraceContextLen);
89   if (tracing_len > 0) {
90     GRPC_LOG_IF_ERROR(
91         "census grpc_filter",
92         grpc_metadata_batch_add_tail(
93             send_initial_metadata, &tracing_bin_,
94             grpc_mdelem_from_slices(
95                 GRPC_MDSTR_GRPC_TRACE_BIN,
96                 grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
97             GRPC_BATCH_GRPC_TRACE_BIN));
98   }
99   grpc_slice tags = grpc_empty_slice();
100   // TODO(unknown): Add in tagging serialization.
101   size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
102   if (encoded_tags_len > 0) {
103     GRPC_LOG_IF_ERROR(
104         "census grpc_filter",
105         grpc_metadata_batch_add_tail(
106             send_initial_metadata, &stats_bin_,
107             grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
108             GRPC_BATCH_GRPC_TAGS_BIN));
109   }
110 }
111
112 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
113     const grpc_core::ByteStream& /* send_message */) {
114   ++sent_message_count_;
115 }
116
117 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
118     const grpc_core::ByteStream& /* recv_message */) {
119   ++recv_message_count_;
120 }
121
122 namespace {
123
124 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
125   if (b->idx.named.grpc_server_stats_bin != nullptr) {
126     ServerStatsDeserialize(
127         reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
128             GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
129         GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
130         elapsed_time);
131     grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
132   }
133 }
134
135 }  // namespace
136
137 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
138     RecordReceivedTrailingMetadata(
139         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
140         const grpc_transport_stream_stats& transport_stream_stats) {
141   FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time_);
142   const uint64_t request_size = transport_stream_stats.outgoing.data_bytes;
143   const uint64_t response_size = transport_stream_stats.incoming.data_bytes;
144   std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
145       context_.tags().tags();
146   tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
147   status_code_ = status.code();
148   std::string final_status = absl::StatusCodeToString(status_code_);
149   tags.emplace_back(ClientStatusTagKey(), final_status);
150   ::opencensus::stats::Record(
151       {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
152        {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
153        {RpcClientServerLatency(),
154         ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}},
155       tags);
156 }
157
158 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
159     grpc_error_handle cancel_error) {
160   status_code_ = absl::StatusCode::kCancelled;
161   GRPC_ERROR_UNREF(cancel_error);
162 }
163
164 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
165     const gpr_timespec& /* latency */) {
166   double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
167   std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
168       context_.tags().tags();
169   tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
170   tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
171   ::opencensus::stats::Record(
172       {{RpcClientRoundtripLatency(), latency_ms},
173        {RpcClientSentMessagesPerRpc(), sent_message_count_},
174        {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
175       tags);
176   if (status_code_ != absl::StatusCode::kOk) {
177     context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
178                               StatusCodeToString(status_code_));
179   }
180   context_.EndSpan();
181   grpc_core::MutexLock lock(&parent_->mu_);
182   if (--parent_->num_active_rpcs_ == 0) {
183     parent_->time_at_last_attempt_end_ = absl::Now();
184   }
185   if (arena_allocated_) {
186     this->~OpenCensusCallAttemptTracer();
187   } else {
188     delete this;
189   }
190 }
191
192 //
193 // OpenCensusCallTracer
194 //
195
196 OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args)
197     : call_context_(args->context),
198       path_(grpc_slice_ref_internal(args->path)),
199       method_(GetMethod(&path_)),
200       arena_(args->arena) {}
201
202 OpenCensusCallTracer::~OpenCensusCallTracer() {
203   std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
204       context_.tags().tags();
205   tags.emplace_back(ClientMethodTagKey(), std::string(method_));
206   ::opencensus::stats::Record(
207       {{RpcClientRetriesPerCall(), retries_ - 1},  // exclude first attempt
208        {RpcClientTransparentRetriesPerCall(), transparent_retries_},
209        {RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
210       tags);
211   grpc_slice_unref_internal(path_);
212 }
213
214 OpenCensusCallTracer::OpenCensusCallAttemptTracer*
215 OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
216   // We allocate the first attempt on the arena and all subsequent attempts on
217   // the heap, so that in the common case we don't require a heap allocation,
218   // nor do we unnecessarily grow the arena.
219   bool is_first_attempt = true;
220   uint64_t attempt_num;
221   {
222     grpc_core::MutexLock lock(&mu_);
223     if (transparent_retries_ != 0 || retries_ != 0) {
224       is_first_attempt = false;
225       if (num_active_rpcs_ == 0) {
226         retry_delay_ += absl::Now() - time_at_last_attempt_end_;
227       }
228     }
229     attempt_num = retries_;
230     if (is_transparent_retry) {
231       ++transparent_retries_;
232     } else {
233       ++retries_;
234     }
235     ++num_active_rpcs_;
236   }
237   if (is_first_attempt) {
238     // Note that we are generating the overall call context here instead of in
239     // the constructor of `OpenCensusCallTracer` due to the semantics of
240     // `grpc_census_call_set_context` which allows the application to set the
241     // census context for a call anytime before the first call to
242     // `grpc_call_start_batch`.
243     auto* parent_context = reinterpret_cast<CensusContext*>(
244         call_context_[GRPC_CONTEXT_TRACING].value);
245     GenerateClientContext(
246         absl::StrCat("Sent.", method_), &context_,
247         (parent_context == nullptr) ? nullptr : parent_context);
248     return arena_->New<OpenCensusCallAttemptTracer>(
249         this, attempt_num, is_transparent_retry, true /* arena_allocated */);
250   }
251   return new OpenCensusCallAttemptTracer(
252       this, attempt_num, is_transparent_retry, false /* arena_allocated */);
253 }
254
255 }  // namespace grpc