#include <memory>
#include <vector>
+#include "absl/memory/memory.h"
+
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
+#include <grpcpp/create_channel_posix.h>
#include <grpcpp/generic/generic_stub.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
+#include <grpcpp/server_posix.h>
#include <grpcpp/support/client_interceptor.h>
+#include "src/core/lib/iomgr/port.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/byte_buffer_proto_helper.h"
#include "test/cpp/util/string_ref_helper.h"
+#ifdef GRPC_POSIX_SOCKET
+#include <fcntl.h>
+#include "src/core/lib/iomgr/socket_utils_posix.h"
+#endif /* GRPC_POSIX_SOCKET */
+
#include <gtest/gtest.h>
namespace grpc {
kAsyncCQBidiStreaming,
};
+enum class ChannelType {
+ kHttpChannel,
+ kFdChannel,
+};
+
/* Hijacks Echo RPC and fills in the expected values */
class HijackingInterceptor : public experimental::Interceptor {
public:
EXPECT_EQ(info->type(), experimental::ClientRpcInfo::Type::UNARY);
}
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
+ void Intercept(experimental::InterceptorBatchMethods* methods) override {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
class HijackingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
- virtual experimental::Interceptor* CreateClientInterceptor(
+ experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new HijackingInterceptor(info);
}
EXPECT_EQ(strcmp("/grpc.testing.EchoTestService/Echo", info->method()), 0);
}
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
+ void Intercept(experimental::InterceptorBatchMethods* methods) override {
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
auto* map = methods->GetSendInitialMetadata();
class HijackingInterceptorMakesAnotherCallFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
- virtual experimental::Interceptor* CreateClientInterceptor(
+ experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new HijackingInterceptorMakesAnotherCall(info);
}
info_ = info;
}
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
+ void Intercept(experimental::InterceptorBatchMethods* methods) override {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
ClientStreamingRpcHijackingInterceptor(experimental::ClientRpcInfo* info) {
info_ = info;
}
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
+ void Intercept(experimental::InterceptorBatchMethods* methods) override {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
class ClientStreamingRpcHijackingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
- virtual experimental::Interceptor* CreateClientInterceptor(
+ experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new ClientStreamingRpcHijackingInterceptor(info);
}
got_failed_message_ = false;
}
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
+ void Intercept(experimental::InterceptorBatchMethods* methods) override {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
class ServerStreamingRpcHijackingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
- virtual experimental::Interceptor* CreateClientInterceptor(
+ experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new ServerStreamingRpcHijackingInterceptor(info);
}
class BidiStreamingRpcHijackingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
- virtual experimental::Interceptor* CreateClientInterceptor(
+ experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new BidiStreamingRpcHijackingInterceptor(info);
}
post_recv_status_ = false;
}
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
+ void Intercept(experimental::InterceptorBatchMethods* methods) override {
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
auto* map = methods->GetSendInitialMetadata();
class LoggingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
- virtual experimental::Interceptor* CreateClientInterceptor(
+ experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new LoggingInterceptor(info);
}
class TestScenario {
public:
- explicit TestScenario(const RPCType& type) : type_(type) {}
+ explicit TestScenario(const ChannelType& channel_type,
+ const RPCType& rpc_type)
+ : channel_type_(channel_type), rpc_type_(rpc_type) {}
+
+ ChannelType channel_type() const { return channel_type_; }
- RPCType type() const { return type_; }
+ RPCType rpc_type() const { return rpc_type_; }
private:
- RPCType type_;
+ const ChannelType channel_type_;
+ const RPCType rpc_type_;
};
std::vector<TestScenario> CreateTestScenarios() {
std::vector<TestScenario> scenarios;
- scenarios.emplace_back(RPCType::kSyncUnary);
- scenarios.emplace_back(RPCType::kSyncClientStreaming);
- scenarios.emplace_back(RPCType::kSyncServerStreaming);
- scenarios.emplace_back(RPCType::kSyncBidiStreaming);
- scenarios.emplace_back(RPCType::kAsyncCQUnary);
- scenarios.emplace_back(RPCType::kAsyncCQServerStreaming);
+ std::vector<RPCType> rpc_types;
+ rpc_types.emplace_back(RPCType::kSyncUnary);
+ rpc_types.emplace_back(RPCType::kSyncClientStreaming);
+ rpc_types.emplace_back(RPCType::kSyncServerStreaming);
+ rpc_types.emplace_back(RPCType::kSyncBidiStreaming);
+ rpc_types.emplace_back(RPCType::kAsyncCQUnary);
+ rpc_types.emplace_back(RPCType::kAsyncCQServerStreaming);
+ for (const auto& rpc_type : rpc_types) {
+ scenarios.emplace_back(ChannelType::kHttpChannel, rpc_type);
+// TODO(yashykt): Maybe add support for non-posix sockets too
+#ifdef GRPC_POSIX_SOCKET
+ scenarios.emplace_back(ChannelType::kFdChannel, rpc_type);
+#endif /* GRPC_POSIX_SOCKET */
+ }
return scenarios;
}
: public ::testing::TestWithParam<TestScenario> {
protected:
ParameterizedClientInterceptorsEnd2endTest() {
- int port = grpc_pick_unused_port_or_die();
-
ServerBuilder builder;
- server_address_ = "localhost:" + std::to_string(port);
- builder.AddListeningPort(server_address_, InsecureServerCredentials());
builder.RegisterService(&service_);
- server_ = builder.BuildAndStart();
+ if (GetParam().channel_type() == ChannelType::kHttpChannel) {
+ int port = grpc_pick_unused_port_or_die();
+ server_address_ = "localhost:" + std::to_string(port);
+ builder.AddListeningPort(server_address_, InsecureServerCredentials());
+ server_ = builder.BuildAndStart();
+ }
+#ifdef GRPC_POSIX_SOCKET
+ else if (GetParam().channel_type() == ChannelType::kFdChannel) {
+ int flags;
+ GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv_) == 0);
+ flags = fcntl(sv_[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv_[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv_[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv_[1], F_SETFL, flags | O_NONBLOCK) == 0);
+ GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv_[0]) ==
+ GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv_[1]) ==
+ GRPC_ERROR_NONE);
+ server_ = builder.BuildAndStart();
+ AddInsecureChannelFromFd(server_.get(), sv_[1]);
+ }
+#endif /* GRPC_POSIX_SOCKET */
+ }
+
+ ~ParameterizedClientInterceptorsEnd2endTest() override {
+ server_->Shutdown();
}
- ~ParameterizedClientInterceptorsEnd2endTest() { server_->Shutdown(); }
+ std::shared_ptr<grpc::Channel> CreateClientChannel(
+ std::vector<std::unique_ptr<
+ grpc::experimental::ClientInterceptorFactoryInterface>>
+ creators) {
+ if (GetParam().channel_type() == ChannelType::kHttpChannel) {
+ return experimental::CreateCustomChannelWithInterceptors(
+ server_address_, InsecureChannelCredentials(), ChannelArguments(),
+ std::move(creators));
+ }
+#ifdef GRPC_POSIX_SOCKET
+ else if (GetParam().channel_type() == ChannelType::kFdChannel) {
+ return experimental::CreateCustomInsecureChannelWithInterceptorsFromFd(
+ "", sv_[0], ChannelArguments(), std::move(creators));
+ }
+#endif /* GRPC_POSIX_SOCKET */
+ return nullptr;
+ }
void SendRPC(const std::shared_ptr<Channel>& channel) {
- switch (GetParam().type()) {
+ switch (GetParam().rpc_type()) {
case RPCType::kSyncUnary:
MakeCall(channel);
break;
}
std::string server_address_;
+ int sv_[2];
EchoTestServiceStreamingImpl service_;
std::unique_ptr<Server> server_;
};
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<LoggingInterceptorFactory>(
- new LoggingInterceptorFactory()));
+ creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
// Add 20 dummy interceptors
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
- auto channel = experimental::CreateCustomChannelWithInterceptors(
- server_address_, InsecureChannelCredentials(), args, std::move(creators));
+ auto channel = CreateClientChannel(std::move(creators));
SendRPC(channel);
- LoggingInterceptor::VerifyCall(GetParam().type());
+ LoggingInterceptor::VerifyCall(GetParam().rpc_type());
// Make sure all 20 dummy interceptors were run
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
}
server_ = builder.BuildAndStart();
}
- ~ClientInterceptorsEnd2endTest() { server_->Shutdown(); }
+ ~ClientInterceptorsEnd2endTest() override { server_->Shutdown(); }
std::string server_address_;
TestServiceImpl service_;
ChannelArguments args;
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<HijackingInterceptorFactory>(
- new HijackingInterceptorFactory()));
+ creators.push_back(absl::make_unique<HijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, nullptr, args, std::move(creators));
MakeCall(channel);
// Add 20 dummy interceptors before hijacking interceptor
creators.reserve(20);
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
- creators.push_back(std::unique_ptr<HijackingInterceptorFactory>(
- new HijackingInterceptorFactory()));
+ creators.push_back(absl::make_unique<HijackingInterceptorFactory>());
// Add 20 dummy interceptors after hijacking interceptor
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
ChannelArguments args;
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<LoggingInterceptorFactory>(
- new LoggingInterceptorFactory()));
- creators.push_back(std::unique_ptr<HijackingInterceptorFactory>(
- new HijackingInterceptorFactory()));
+ creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
+ creators.push_back(absl::make_unique<HijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeCall(channel);
// Add 5 dummy interceptors before hijacking interceptor
creators.reserve(5);
for (auto i = 0; i < 5; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
creators.push_back(
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>(
new HijackingInterceptorMakesAnotherCallFactory()));
// Add 7 dummy interceptors after hijacking interceptor
for (auto i = 0; i < 7; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = server_->experimental().InProcessChannelWithInterceptors(
args, std::move(creators));
server_ = builder.BuildAndStart();
}
- ~ClientInterceptorsCallbackEnd2endTest() { server_->Shutdown(); }
+ ~ClientInterceptorsCallbackEnd2endTest() override { server_->Shutdown(); }
std::string server_address_;
TestServiceImpl service_;
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<LoggingInterceptorFactory>(
- new LoggingInterceptorFactory()));
+ creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
// Add 20 dummy interceptors
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = server_->experimental().InProcessChannelWithInterceptors(
args, std::move(creators));
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<LoggingInterceptorFactory>(
- new LoggingInterceptorFactory()));
+ creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
// Add 20 dummy interceptors and 20 null interceptors
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
- creators.push_back(
- std::unique_ptr<NullInterceptorFactory>(new NullInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
+ creators.push_back(absl::make_unique<NullInterceptorFactory>());
}
auto channel = server_->experimental().InProcessChannelWithInterceptors(
args, std::move(creators));
server_ = builder.BuildAndStart();
}
- ~ClientInterceptorsStreamingEnd2endTest() { server_->Shutdown(); }
+ ~ClientInterceptorsStreamingEnd2endTest() override { server_->Shutdown(); }
std::string server_address_;
EchoTestServiceStreamingImpl service_;
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<LoggingInterceptorFactory>(
- new LoggingInterceptorFactory()));
+ creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
// Add 20 dummy interceptors
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<LoggingInterceptorFactory>(
- new LoggingInterceptorFactory()));
+ creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
// Add 20 dummy interceptors
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(
- std::unique_ptr<ClientStreamingRpcHijackingInterceptorFactory>(
- new ClientStreamingRpcHijackingInterceptorFactory()));
+ absl::make_unique<ClientStreamingRpcHijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(
- std::unique_ptr<ServerStreamingRpcHijackingInterceptorFactory>(
- new ServerStreamingRpcHijackingInterceptorFactory()));
+ absl::make_unique<ServerStreamingRpcHijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeServerStreamingCall(channel);
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(
- std::unique_ptr<ServerStreamingRpcHijackingInterceptorFactory>(
- new ServerStreamingRpcHijackingInterceptorFactory()));
+ absl::make_unique<ServerStreamingRpcHijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeAsyncCQServerStreamingCall(channel);
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(
- std::unique_ptr<BidiStreamingRpcHijackingInterceptorFactory>(
- new BidiStreamingRpcHijackingInterceptorFactory()));
+ absl::make_unique<BidiStreamingRpcHijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeBidiStreamingCall(channel);
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
- creators.push_back(std::unique_ptr<LoggingInterceptorFactory>(
- new LoggingInterceptorFactory()));
+ creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
// Add 20 dummy interceptors
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
server_ = builder.BuildAndStart();
}
- ~ClientGlobalInterceptorEnd2endTest() { server_->Shutdown(); }
+ ~ClientGlobalInterceptorEnd2endTest() override { server_->Shutdown(); }
std::string server_address_;
TestServiceImpl service_;
// Add 20 dummy interceptors
creators.reserve(20);
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
// Add 20 dummy interceptors
creators.reserve(20);
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
// Add 20 dummy interceptors
creators.reserve(20);
for (auto i = 0; i < 20; i++) {
- creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
- new DummyInterceptorFactory()));
+ creators.push_back(absl::make_unique<DummyInterceptorFactory>());
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));