return Err;
// Close the response message.
- return C.endSendMessage();
+ if (auto Err = C.endSendMessage())
+ return Err;
+ return C.send();
}
template <typename ChannelT, typename FunctionIdT, typename SequenceNumberT>
return Err2;
if (auto Err2 = serializeSeq(C, std::move(Err)))
return Err2;
- return C.endSendMessage();
+ if (auto Err2 = C.endSendMessage())
+ return Err2;
+ return C.send();
}
};
C, *ResultOrErr))
return Err;
- // Close the response message.
- return C.endSendMessage();
+ // End the response message.
+ if (auto Err = C.endSendMessage())
+ return Err;
+
+ return C.send();
}
template <typename ChannelT, typename FunctionIdT, typename SequenceNumberT>
return Err;
if (auto Err2 = C.startSendMessage(ResponseId, SeqNo))
return Err2;
- return C.endSendMessage();
+ if (auto Err2 = C.endSendMessage())
+ return Err2;
+ return C.send();
}
};
return std::move(Err);
}
+ if (auto Err = this->C.send()) {
+ detail::ResultTraits<typename Func::ReturnType>::consumeAbandoned(
+ std::move(Result));
+ return std::move(Err);
+ }
+
while (!ReceivedResponse) {
if (auto Err = this->handleOne()) {
detail::ResultTraits<typename Func::ReturnType>::consumeAbandoned(
QueueChannel(QueueChannel&&) = delete;
QueueChannel& operator=(QueueChannel&&) = delete;
+ template <typename FunctionIdT, typename SequenceIdT>
+ Error startSendMessage(const FunctionIdT &FnId, const SequenceIdT &SeqNo) {
+ ++InFlightOutgoingMessages;
+ return orc::rpc::RawByteChannel::startSendMessage(FnId, SeqNo);
+ }
+
+ Error endSendMessage() {
+ --InFlightOutgoingMessages;
+ ++CompletedOutgoingMessages;
+ return orc::rpc::RawByteChannel::endSendMessage();
+ }
+
+ template <typename FunctionIdT, typename SequenceNumberT>
+ Error startReceiveMessage(FunctionIdT &FnId, SequenceNumberT &SeqNo) {
+ ++InFlightIncomingMessages;
+ return orc::rpc::RawByteChannel::startReceiveMessage(FnId, SeqNo);
+ }
+
+ Error endReceiveMessage() {
+ --InFlightIncomingMessages;
+ ++CompletedIncomingMessages;
+ return orc::rpc::RawByteChannel::endReceiveMessage();
+ }
+
Error readBytes(char *Dst, unsigned Size) override {
std::unique_lock<std::mutex> Lock(InQueue->getMutex());
while (Size) {
return Error::success();
}
- Error send() override { return Error::success(); }
+ Error send() override {
+ ++SendCalls;
+ return Error::success();
+ }
void close() {
auto ChannelClosed = []() { return make_error<QueueChannelClosedError>(); };
uint64_t NumWritten = 0;
uint64_t NumRead = 0;
+ std::atomic<size_t> InFlightIncomingMessages{0};
+ std::atomic<size_t> CompletedIncomingMessages{0};
+ std::atomic<size_t> InFlightOutgoingMessages{0};
+ std::atomic<size_t> CompletedOutgoingMessages{0};
+ std::atomic<size_t> SendCalls{0};
private:
EXPECT_FALSE(!!Err) << "Client failed to handle response from void(bool)";
}
+ // The client should have made two calls to send: One implicit call to
+ // negotiate the VoidBool function key, and a second to make the VoidBool
+ // call.
+ EXPECT_EQ(Channels.first->SendCalls, 2U)
+ << "Expected one send call to have been made by client";
+
+ // The server should have made two calls to send: One to send the response to
+ // the negotiate call, and another to send the response to the VoidBool call.
+ EXPECT_EQ(Channels.second->SendCalls, 2U)
+ << "Expected two send calls to have been made by server";
+
ServerThread.join();
}