3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
25 #include "src/cpp/ext/filters/census/client_filter.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/string_view.h"
29 #include "opencensus/stats/stats.h"
30 #include "opencensus/tags/context_util.h"
31 #include "opencensus/tags/tag_key.h"
32 #include "opencensus/tags/tag_map.h"
33 #include "src/core/lib/surface/call.h"
34 #include "src/cpp/ext/filters/census/grpc_plugin.h"
35 #include "src/cpp/ext/filters/census/measures.h"
40 OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
42 OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
44 grpc_error_handle CensusClientCallData::Init(
45 grpc_call_element* /* elem */, const grpc_call_element_args* args) {
46 auto tracer = args->arena->New<OpenCensusCallTracer>(args);
47 GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
48 args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
49 args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
50 (static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
52 return GRPC_ERROR_NONE;
56 // OpenCensusCallTracer::OpenCensusCallAttemptTracer
61 CensusContext CreateCensusContextForCallAttempt(
62 absl::string_view method, const CensusContext& parent_context) {
63 GPR_DEBUG_ASSERT(parent_context.Context().IsValid());
64 return CensusContext(absl::StrCat("Attempt.", method), &parent_context.Span(),
65 parent_context.tags());
70 OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
71 OpenCensusCallTracer* parent, uint64_t attempt_num,
72 bool is_transparent_retry, bool arena_allocated)
74 arena_allocated_(arena_allocated),
75 context_(CreateCensusContextForCallAttempt(parent_->method_,
77 start_time_(absl::Now()) {
78 context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
79 context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
80 memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
81 memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
84 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
85 RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
86 uint32_t /* flags */) {
87 size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
89 if (tracing_len > 0) {
92 grpc_metadata_batch_add_tail(
93 send_initial_metadata, &tracing_bin_,
94 grpc_mdelem_from_slices(
95 GRPC_MDSTR_GRPC_TRACE_BIN,
96 grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
97 GRPC_BATCH_GRPC_TRACE_BIN));
99 grpc_slice tags = grpc_empty_slice();
100 // TODO(unknown): Add in tagging serialization.
101 size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
102 if (encoded_tags_len > 0) {
104 "census grpc_filter",
105 grpc_metadata_batch_add_tail(
106 send_initial_metadata, &stats_bin_,
107 grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
108 GRPC_BATCH_GRPC_TAGS_BIN));
112 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
113 const grpc_core::ByteStream& /* send_message */) {
114 ++sent_message_count_;
117 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
118 const grpc_core::ByteStream& /* recv_message */) {
119 ++recv_message_count_;
124 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
125 if (b->idx.named.grpc_server_stats_bin != nullptr) {
126 ServerStatsDeserialize(
127 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
128 GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
129 GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
131 grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
137 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
138 RecordReceivedTrailingMetadata(
139 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
140 const grpc_transport_stream_stats& transport_stream_stats) {
141 FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time_);
142 const uint64_t request_size = transport_stream_stats.outgoing.data_bytes;
143 const uint64_t response_size = transport_stream_stats.incoming.data_bytes;
144 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
145 context_.tags().tags();
146 tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
147 status_code_ = status.code();
148 std::string final_status = absl::StatusCodeToString(status_code_);
149 tags.emplace_back(ClientStatusTagKey(), final_status);
150 ::opencensus::stats::Record(
151 {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
152 {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
153 {RpcClientServerLatency(),
154 ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}},
158 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
159 grpc_error_handle cancel_error) {
160 status_code_ = absl::StatusCode::kCancelled;
161 GRPC_ERROR_UNREF(cancel_error);
164 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
165 const gpr_timespec& /* latency */) {
166 double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
167 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
168 context_.tags().tags();
169 tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
170 tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
171 ::opencensus::stats::Record(
172 {{RpcClientRoundtripLatency(), latency_ms},
173 {RpcClientSentMessagesPerRpc(), sent_message_count_},
174 {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
176 if (status_code_ != absl::StatusCode::kOk) {
177 context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
178 StatusCodeToString(status_code_));
181 grpc_core::MutexLock lock(&parent_->mu_);
182 if (--parent_->num_active_rpcs_ == 0) {
183 parent_->time_at_last_attempt_end_ = absl::Now();
185 if (arena_allocated_) {
186 this->~OpenCensusCallAttemptTracer();
193 // OpenCensusCallTracer
196 OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args)
197 : call_context_(args->context),
198 path_(grpc_slice_ref_internal(args->path)),
199 method_(GetMethod(&path_)),
200 arena_(args->arena) {}
202 OpenCensusCallTracer::~OpenCensusCallTracer() {
203 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
204 context_.tags().tags();
205 tags.emplace_back(ClientMethodTagKey(), std::string(method_));
206 ::opencensus::stats::Record(
207 {{RpcClientRetriesPerCall(), retries_ - 1}, // exclude first attempt
208 {RpcClientTransparentRetriesPerCall(), transparent_retries_},
209 {RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
211 grpc_slice_unref_internal(path_);
214 OpenCensusCallTracer::OpenCensusCallAttemptTracer*
215 OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
216 // We allocate the first attempt on the arena and all subsequent attempts on
217 // the heap, so that in the common case we don't require a heap allocation,
218 // nor do we unnecessarily grow the arena.
219 bool is_first_attempt = true;
220 uint64_t attempt_num;
222 grpc_core::MutexLock lock(&mu_);
223 if (transparent_retries_ != 0 || retries_ != 0) {
224 is_first_attempt = false;
225 if (num_active_rpcs_ == 0) {
226 retry_delay_ += absl::Now() - time_at_last_attempt_end_;
229 attempt_num = retries_;
230 if (is_transparent_retry) {
231 ++transparent_retries_;
237 if (is_first_attempt) {
238 // Note that we are generating the overall call context here instead of in
239 // the constructor of `OpenCensusCallTracer` due to the semantics of
240 // `grpc_census_call_set_context` which allows the application to set the
241 // census context for a call anytime before the first call to
242 // `grpc_call_start_batch`.
243 auto* parent_context = reinterpret_cast<CensusContext*>(
244 call_context_[GRPC_CONTEXT_TRACING].value);
245 GenerateClientContext(
246 absl::StrCat("Sent.", method_), &context_,
247 (parent_context == nullptr) ? nullptr : parent_context);
248 return arena_->New<OpenCensusCallAttemptTracer>(
249 this, attempt_num, is_transparent_retry, true /* arena_allocated */);
251 return new OpenCensusCallAttemptTracer(
252 this, attempt_num, is_transparent_retry, false /* arena_allocated */);