1 // Copyright 2021 gRPC authors.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 #include <grpc/impl/codegen/port_platform.h>
17 #include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
19 #include <grpc/support/log.h>
25 namespace grpc_binder {
26 void TransportStreamReceiverImpl::RegisterRecvInitialMetadata(
27 StreamIdentifier id, std::function<void(const Metadata&)> cb) {
28 // TODO(mingcl): Don't lock the whole function
29 grpc_core::MutexLock l(&m_);
30 gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
31 GPR_ASSERT(initial_metadata_cbs_.count(id) == 0);
32 auto iter = pending_initial_metadata_.find(id);
33 if (iter == pending_initial_metadata_.end()) {
34 initial_metadata_cbs_[id] = std::move(cb);
36 cb(iter->second.front());
38 if (iter->second.empty()) {
39 pending_initial_metadata_.erase(iter);
44 void TransportStreamReceiverImpl::RegisterRecvMessage(
45 StreamIdentifier id, std::function<void(const std::string&)> cb) {
46 // TODO(mingcl): Don't lock the whole function
47 grpc_core::MutexLock l(&m_);
48 gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
49 GPR_ASSERT(message_cbs_.count(id) == 0);
50 auto iter = pending_message_.find(id);
51 if (iter == pending_message_.end()) {
52 message_cbs_[id] = std::move(cb);
54 cb(iter->second.front());
56 if (iter->second.empty()) {
57 pending_message_.erase(iter);
62 void TransportStreamReceiverImpl::RegisterRecvTrailingMetadata(
63 StreamIdentifier id, std::function<void(const Metadata&, int)> cb) {
64 // TODO(mingcl): Don't lock the whole function
65 grpc_core::MutexLock l(&m_);
66 gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
67 GPR_ASSERT(trailing_metadata_cbs_.count(id) == 0);
68 auto iter = pending_trailing_metadata_.find(id);
69 if (iter == pending_trailing_metadata_.end()) {
70 trailing_metadata_cbs_[id] = std::move(cb);
73 const auto& p = iter->second.front();
74 cb(p.first, p.second);
77 if (iter->second.empty()) {
78 pending_trailing_metadata_.erase(iter);
83 void TransportStreamReceiverImpl::NotifyRecvInitialMetadata(
84 StreamIdentifier id, const Metadata& initial_metadata) {
85 gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
86 std::function<void(const Metadata&)> cb;
88 grpc_core::MutexLock l(&m_);
89 auto iter = initial_metadata_cbs_.find(id);
90 if (iter != initial_metadata_cbs_.end()) {
92 initial_metadata_cbs_.erase(iter);
94 pending_initial_metadata_[id].push(initial_metadata);
102 void TransportStreamReceiverImpl::NotifyRecvMessage(
103 StreamIdentifier id, const std::string& message) {
104 gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
105 std::function<void(const std::string&)> cb;
107 grpc_core::MutexLock l(&m_);
108 auto iter = message_cbs_.find(id);
109 if (iter != message_cbs_.end()) {
111 message_cbs_.erase(iter);
113 pending_message_[id].push(message);
121 void TransportStreamReceiverImpl::NotifyRecvTrailingMetadata(
122 StreamIdentifier id, const Metadata& trailing_metadata, int status) {
123 gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
124 std::function<void(const Metadata&, int)> cb;
126 grpc_core::MutexLock l(&m_);
127 auto iter = trailing_metadata_cbs_.find(id);
128 if (iter != trailing_metadata_cbs_.end()) {
130 trailing_metadata_cbs_.erase(iter);
132 pending_trailing_metadata_[id].emplace(trailing_metadata, status);
136 cb(trailing_metadata, status);
139 } // namespace grpc_binder