Imported Upstream version 1.36.0
[platform/upstream/grpc.git] / src / cpp / ext / filters / census / server_filter.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/ext/filters/census/server_filter.h"
22
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/string_view.h"
25 #include "absl/time/clock.h"
26 #include "absl/time/time.h"
27 #include "opencensus/stats/stats.h"
28 #include "src/core/lib/surface/call.h"
29 #include "src/cpp/ext/filters/census/grpc_plugin.h"
30 #include "src/cpp/ext/filters/census/measures.h"
31
32 namespace grpc {
33
34 constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
35
36 namespace {
37
38 // server metadata elements
39 struct ServerMetadataElements {
40   grpc_slice path;
41   grpc_slice tracing_slice;
42   grpc_slice census_proto;
43 };
44
45 void FilterInitialMetadata(grpc_metadata_batch* b,
46                            ServerMetadataElements* sml) {
47   if (b->idx.named.path != nullptr) {
48     sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
49   }
50   if (b->idx.named.grpc_trace_bin != nullptr) {
51     sml->tracing_slice =
52         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
53     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_TRACE_BIN);
54   }
55   if (b->idx.named.grpc_tags_bin != nullptr) {
56     sml->census_proto =
57         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
58     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_TAGS_BIN);
59   }
60 }
61
62 }  // namespace
63
64 void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
65                                                grpc_error* error) {
66   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
67   CensusServerCallData* calld =
68       reinterpret_cast<CensusServerCallData*>(elem->call_data);
69   CensusChannelData* channeld =
70       reinterpret_cast<CensusChannelData*>(elem->channel_data);
71   GPR_ASSERT(calld != nullptr);
72   GPR_ASSERT(channeld != nullptr);
73   // Stream messages are no longer valid after receiving trailing metadata.
74   if ((*calld->recv_message_) != nullptr) {
75     ++calld->recv_message_count_;
76   }
77   grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_,
78                           GRPC_ERROR_REF(error));
79 }
80
81 void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
82                                                        grpc_error* error) {
83   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
84   CensusServerCallData* calld =
85       reinterpret_cast<CensusServerCallData*>(elem->call_data);
86   GPR_ASSERT(calld != nullptr);
87   if (error == GRPC_ERROR_NONE) {
88     grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
89     GPR_ASSERT(initial_metadata != nullptr);
90     ServerMetadataElements sml;
91     sml.path = grpc_empty_slice();
92     sml.tracing_slice = grpc_empty_slice();
93     sml.census_proto = grpc_empty_slice();
94     FilterInitialMetadata(initial_metadata, &sml);
95     calld->path_ = grpc_slice_ref_internal(sml.path);
96     calld->method_ = GetMethod(&calld->path_);
97     calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
98     const char* tracing_str =
99         GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
100             ? ""
101             : reinterpret_cast<const char*>(
102                   GRPC_SLICE_START_PTR(sml.tracing_slice));
103     size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
104                                  ? 0
105                                  : GRPC_SLICE_LENGTH(sml.tracing_slice);
106     GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
107                           calld->qualified_method_, &calld->context_);
108     grpc_slice_unref_internal(sml.tracing_slice);
109     grpc_slice_unref_internal(sml.census_proto);
110     grpc_slice_unref_internal(sml.path);
111     grpc_census_call_set_context(
112         calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
113   }
114   grpc_core::Closure::Run(DEBUG_LOCATION,
115                           calld->initial_on_done_recv_initial_metadata_,
116                           GRPC_ERROR_REF(error));
117 }
118
119 void CensusServerCallData::StartTransportStreamOpBatch(
120     grpc_call_element* elem, TransportStreamOpBatch* op) {
121   if (op->recv_initial_metadata() != nullptr) {
122     // substitute our callback for the op callback
123     recv_initial_metadata_ = op->recv_initial_metadata()->batch();
124     initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
125     op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
126   }
127   if (op->send_message() != nullptr) {
128     ++sent_message_count_;
129   }
130   if (op->recv_message() != nullptr) {
131     recv_message_ = op->op()->payload->recv_message.recv_message;
132     initial_on_done_recv_message_ =
133         op->op()->payload->recv_message.recv_message_ready;
134     op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
135   }
136   // We need to record the time when the trailing metadata was sent to mark the
137   // completeness of the request.
138   if (op->send_trailing_metadata() != nullptr) {
139     elapsed_time_ = absl::Now() - start_time_;
140     size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
141                                       stats_buf_, kMaxServerStatsLen);
142     if (len > 0) {
143       GRPC_LOG_IF_ERROR(
144           "census grpc_filter",
145           grpc_metadata_batch_add_tail(
146               op->send_trailing_metadata()->batch(), &census_bin_,
147               grpc_mdelem_from_slices(
148                   GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
149                   grpc_core::UnmanagedMemorySlice(stats_buf_, len)),
150               GRPC_BATCH_GRPC_SERVER_STATS_BIN));
151     }
152   }
153   // Call next op.
154   grpc_call_next_op(elem, op->op());
155 }
156
157 grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
158                                        const grpc_call_element_args* args) {
159   start_time_ = absl::Now();
160   gc_ =
161       grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
162   GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
163                     OnDoneRecvInitialMetadataCb, elem,
164                     grpc_schedule_on_exec_ctx);
165   GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
166                     grpc_schedule_on_exec_ctx);
167   auth_context_ = grpc_call_auth_context(gc_);
168   return GRPC_ERROR_NONE;
169 }
170
171 void CensusServerCallData::Destroy(grpc_call_element* /*elem*/,
172                                    const grpc_call_final_info* final_info,
173                                    grpc_closure* /*then_call_closure*/) {
174   const uint64_t request_size = GetOutgoingDataSize(final_info);
175   const uint64_t response_size = GetIncomingDataSize(final_info);
176   double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
177   grpc_auth_context_release(auth_context_);
178   ::opencensus::stats::Record(
179       {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
180        {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
181        {RpcServerServerLatency(), elapsed_time_ms},
182        {RpcServerSentMessagesPerRpc(), sent_message_count_},
183        {RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
184       {{ServerMethodTagKey(), method_},
185        {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
186   grpc_slice_unref_internal(path_);
187   context_.EndSpan();
188 }
189
190 }  // namespace grpc