*
*/
-#include <algorithm>
-#include <memory>
-#include <mutex>
-#include <random>
-#include <thread>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <thread>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gpr/env.h"
-
#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/debugger_macros.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
-
-#include <gtest/gtest.h>
+#include "test/cpp/util/test_credentials_provider.h"
#ifdef GPR_LINUX
using grpc::testing::EchoRequest;
namespace testing {
namespace {
-class FlakyNetworkTest : public ::testing::Test {
+struct TestScenario {
+ TestScenario(const grpc::string& creds_type, const grpc::string& content)
+ : credentials_type(creds_type), message_content(content) {}
+ const grpc::string credentials_type;
+ const grpc::string message_content;
+};
+
+class FlakyNetworkTest : public ::testing::TestWithParam<TestScenario> {
protected:
FlakyNetworkTest()
: server_host_("grpctest"),
interface_("lo:1"),
ipv4_address_("10.0.0.1"),
- netmask_("/32"),
- kRequestMessage_("🖖") {}
+ netmask_("/32") {}
void InterfaceUp() {
std::ostringstream cmd;
void FlakeNetwork() {
std::ostringstream cmd;
// Emulate a flaky network connection over interface_. Add a delay of 100ms
- // +/- 590ms, 3% packet loss, 1% duplicates and 0.1% corrupt packets.
+ // +/- 20ms, 0.1% packet loss, 1% duplicates and 0.01% corrupt packets.
cmd << "tc qdisc replace dev " << interface_
- << " root netem delay 100ms 50ms distribution normal loss 3% duplicate "
- "1% corrupt 0.1% ";
+ << " root netem delay 100ms 20ms distribution normal loss 0.1% "
+ "duplicate "
+ "0.1% corrupt 0.01% ";
std::system(cmd.str().c_str());
}
// ip6-looopback, but ipv6 support is not enabled by default in docker.
port_ = SERVER_PORT;
- server_.reset(new ServerData(port_));
+ server_.reset(new ServerData(port_, GetParam().credentials_type));
server_->Start(server_host_);
}
void StopServer() { server_->Shutdown(); }
if (lb_policy_name.size() > 0) {
args.SetLoadBalancingPolicyName(lb_policy_name);
} // else, default to pick first
+ auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
+ GetParam().credentials_type, &args);
std::ostringstream server_address;
server_address << server_host_ << ":" << port_;
- return CreateCustomChannel(server_address.str(),
- InsecureChannelCredentials(), args);
+ return CreateCustomChannel(server_address.str(), channel_creds, args);
}
bool SendRpc(
int timeout_ms = 0, bool wait_for_ready = false) {
auto response = std::unique_ptr<EchoResponse>(new EchoResponse());
EchoRequest request;
- request.set_message(kRequestMessage_);
+ auto& msg = GetParam().message_content;
+ request.set_message(msg);
ClientContext context;
if (timeout_ms > 0) {
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
}
Status status = stub->Echo(&context, request, response.get());
auto ok = status.ok();
+ int stream_id = 0;
+ grpc_call* call = context.c_call();
+ if (call) {
+ grpc_chttp2_stream* stream = grpc_chttp2_stream_from_call(call);
+ if (stream) {
+ stream_id = stream->id;
+ }
+ }
if (ok) {
- gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str());
+ gpr_log(GPR_DEBUG, "RPC with stream_id %d succeeded", stream_id);
} else {
- gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str());
+ gpr_log(GPR_DEBUG, "RPC with stream_id %d failed: %s", stream_id,
+ status.error_message().c_str());
}
return ok;
}
struct ServerData {
int port_;
+ const grpc::string creds_;
std::unique_ptr<Server> server_;
TestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
- explicit ServerData(int port) { port_ = port; }
+ ServerData(int port, const grpc::string& creds)
+ : port_(port), creds_(creds) {}
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
- builder.AddListeningPort(server_address.str(),
- InsecureServerCredentials());
+ auto server_creds =
+ GetCredentialsProvider()->GetServerCredentials(creds_);
+ builder.AddListeningPort(server_address.str(), server_creds);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
std::lock_guard<std::mutex> lock(*mu);
std::unique_ptr<ServerData> server_;
const int SERVER_PORT = 32750;
int port_;
- const grpc::string kRequestMessage_;
};
+std::vector<TestScenario> CreateTestScenarios() {
+ std::vector<TestScenario> scenarios;
+ std::vector<grpc::string> credentials_types;
+ std::vector<grpc::string> messages;
+
+ credentials_types.push_back(kInsecureCredentialsType);
+ auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
+ for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
+ credentials_types.push_back(*sec);
+ }
+
+ messages.push_back("🖖");
+ for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024; k *= 32) {
+ grpc::string big_msg;
+ for (size_t i = 0; i < k * 1024; ++i) {
+ char c = 'a' + (i % 26);
+ big_msg += c;
+ }
+ messages.push_back(big_msg);
+ }
+ for (auto cred = credentials_types.begin(); cred != credentials_types.end();
+ ++cred) {
+ for (auto msg = messages.begin(); msg != messages.end(); msg++) {
+ scenarios.emplace_back(*cred, *msg);
+ }
+ }
+
+ return scenarios;
+}
+
+INSTANTIATE_TEST_CASE_P(FlakyNetworkTest, FlakyNetworkTest,
+ ::testing::ValuesIn(CreateTestScenarios()));
+
// Network interface connected to server flaps
-TEST_F(FlakyNetworkTest, NetworkTransition) {
+TEST_P(FlakyNetworkTest, NetworkTransition) {
const int kKeepAliveTimeMs = 1000;
const int kKeepAliveTimeoutMs = 1000;
ChannelArguments args;
}
// Traffic to server server is blackholed temporarily with keepalives enabled
-TEST_F(FlakyNetworkTest, ServerUnreachableWithKeepalive) {
+TEST_P(FlakyNetworkTest, ServerUnreachableWithKeepalive) {
const int kKeepAliveTimeMs = 1000;
const int kKeepAliveTimeoutMs = 1000;
const int kReconnectBackoffMs = 1000;
//
// Traffic to server server is blackholed temporarily with keepalives disabled
-TEST_F(FlakyNetworkTest, ServerUnreachableNoKeepalive) {
+TEST_P(FlakyNetworkTest, ServerUnreachableNoKeepalive) {
auto channel = BuildChannel("pick_first", ChannelArguments());
auto stub = BuildStub(channel);
// Channel should be in READY state after we send an RPC
}
// Send RPCs over a flaky network connection
-TEST_F(FlakyNetworkTest, FlakyNetwork) {
+TEST_P(FlakyNetworkTest, FlakyNetwork) {
const int kKeepAliveTimeMs = 1000;
const int kKeepAliveTimeoutMs = 1000;
const int kMessageCount = 100;
}
// Server is shutdown gracefully and restarted. Client keepalives are enabled
-TEST_F(FlakyNetworkTest, ServerRestartKeepaliveEnabled) {
+TEST_P(FlakyNetworkTest, ServerRestartKeepaliveEnabled) {
const int kKeepAliveTimeMs = 1000;
const int kKeepAliveTimeoutMs = 1000;
ChannelArguments args;
}
// Server is shutdown gracefully and restarted. Client keepalives are enabled
-TEST_F(FlakyNetworkTest, ServerRestartKeepaliveDisabled) {
+TEST_P(FlakyNetworkTest, ServerRestartKeepaliveDisabled) {
auto channel = BuildChannel("pick_first", ChannelArguments());
auto stub = BuildStub(channel);
// Channel should be in READY state after we send an RPC