f58520bd89b29cfd879eb2368ade536355432539
[platform/upstream/grpc.git] / src / core / ext / transport / binder / utils / transport_stream_receiver_impl.cc
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/impl/codegen/port_platform.h>
16
17 #include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
18
19 #include <grpc/support/log.h>
20
21 #include <functional>
22 #include <string>
23 #include <utility>
24
25 namespace grpc_binder {
26 void TransportStreamReceiverImpl::RegisterRecvInitialMetadata(
27     StreamIdentifier id, std::function<void(const Metadata&)> cb) {
28   // TODO(mingcl): Don't lock the whole function
29   grpc_core::MutexLock l(&m_);
30   gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
31   GPR_ASSERT(initial_metadata_cbs_.count(id) == 0);
32   auto iter = pending_initial_metadata_.find(id);
33   if (iter == pending_initial_metadata_.end()) {
34     initial_metadata_cbs_[id] = std::move(cb);
35   } else {
36     cb(iter->second.front());
37     iter->second.pop();
38     if (iter->second.empty()) {
39       pending_initial_metadata_.erase(iter);
40     }
41   }
42 }
43
44 void TransportStreamReceiverImpl::RegisterRecvMessage(
45     StreamIdentifier id, std::function<void(const std::string&)> cb) {
46   // TODO(mingcl): Don't lock the whole function
47   grpc_core::MutexLock l(&m_);
48   gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
49   GPR_ASSERT(message_cbs_.count(id) == 0);
50   auto iter = pending_message_.find(id);
51   if (iter == pending_message_.end()) {
52     message_cbs_[id] = std::move(cb);
53   } else {
54     cb(iter->second.front());
55     iter->second.pop();
56     if (iter->second.empty()) {
57       pending_message_.erase(iter);
58     }
59   }
60 }
61
62 void TransportStreamReceiverImpl::RegisterRecvTrailingMetadata(
63     StreamIdentifier id, std::function<void(const Metadata&, int)> cb) {
64   // TODO(mingcl): Don't lock the whole function
65   grpc_core::MutexLock l(&m_);
66   gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
67   GPR_ASSERT(trailing_metadata_cbs_.count(id) == 0);
68   auto iter = pending_trailing_metadata_.find(id);
69   if (iter == pending_trailing_metadata_.end()) {
70     trailing_metadata_cbs_[id] = std::move(cb);
71   } else {
72     {
73       const auto& p = iter->second.front();
74       cb(p.first, p.second);
75     }
76     iter->second.pop();
77     if (iter->second.empty()) {
78       pending_trailing_metadata_.erase(iter);
79     }
80   }
81 }
82
83 void TransportStreamReceiverImpl::NotifyRecvInitialMetadata(
84     StreamIdentifier id, const Metadata& initial_metadata) {
85   gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
86   std::function<void(const Metadata&)> cb;
87   {
88     grpc_core::MutexLock l(&m_);
89     auto iter = initial_metadata_cbs_.find(id);
90     if (iter != initial_metadata_cbs_.end()) {
91       cb = iter->second;
92       initial_metadata_cbs_.erase(iter);
93     } else {
94       pending_initial_metadata_[id].push(initial_metadata);
95     }
96   }
97   if (cb != nullptr) {
98     cb(initial_metadata);
99   }
100 }
101
102 void TransportStreamReceiverImpl::NotifyRecvMessage(
103     StreamIdentifier id, const std::string& message) {
104   gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
105   std::function<void(const std::string&)> cb;
106   {
107     grpc_core::MutexLock l(&m_);
108     auto iter = message_cbs_.find(id);
109     if (iter != message_cbs_.end()) {
110       cb = iter->second;
111       message_cbs_.erase(iter);
112     } else {
113       pending_message_[id].push(message);
114     }
115   }
116   if (cb != nullptr) {
117     cb(message);
118   }
119 }
120
121 void TransportStreamReceiverImpl::NotifyRecvTrailingMetadata(
122     StreamIdentifier id, const Metadata& trailing_metadata, int status) {
123   gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
124   std::function<void(const Metadata&, int)> cb;
125   {
126     grpc_core::MutexLock l(&m_);
127     auto iter = trailing_metadata_cbs_.find(id);
128     if (iter != trailing_metadata_cbs_.end()) {
129       cb = iter->second;
130       trailing_metadata_cbs_.erase(iter);
131     } else {
132       pending_trailing_metadata_[id].emplace(trailing_metadata, status);
133     }
134   }
135   if (cb != nullptr) {
136     cb(trailing_metadata, status);
137   }
138 }
139 }  // namespace grpc_binder