#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/parse_address.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
+#include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
private:
friend class FakeResolverResponseGenerator;
+ friend class FakeResolverResponseSetter;
- virtual ~FakeResolver();
+ ~FakeResolver() override;
- void ShutdownLocked() override { active_ = false; }
+ void ShutdownLocked() override;
void MaybeSendResultLocked();
- static void ReturnReresolutionResult(void* arg, grpc_error* error);
+ void ReturnReresolutionResult();
// passed-in parameters
grpc_channel_args* channel_args_ = nullptr;
+ RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// If has_next_result_ is true, next_result_ is the next resolution result
// to be returned.
bool has_next_result_ = false;
// RequestReresolutionLocked().
bool has_reresolution_result_ = false;
Result reresolution_result_;
- // True between the calls to StartLocked() ShutdownLocked().
- bool active_ = false;
+ // True after the call to StartLocked().
+ bool started_ = false;
+ // True after the call to ShutdownLocked().
+ bool shutdown_ = false;
// if true, return failure
bool return_failure_ = false;
// pending re-resolution
- grpc_closure reresolution_closure_;
bool reresolution_closure_pending_ = false;
};
FakeResolver::FakeResolver(ResolverArgs args)
- : Resolver(args.combiner, std::move(args.result_handler)) {
- GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
- grpc_combiner_scheduler(combiner()));
- channel_args_ = grpc_channel_args_copy(args.args);
- FakeResolverResponseGenerator* response_generator =
- FakeResolverResponseGenerator::GetFromArgs(args.args);
- if (response_generator != nullptr) {
- response_generator->resolver_ = this;
- if (response_generator->has_result_) {
- response_generator->SetResponse(std::move(response_generator->result_));
- response_generator->has_result_ = false;
- }
+ : Resolver(std::move(args.work_serializer), std::move(args.result_handler)),
+ response_generator_(
+ FakeResolverResponseGenerator::GetFromArgs(args.args)) {
+ // Channels sharing the same subchannels may have different resolver response
+ // generators. If we don't remove this arg, subchannel pool will create new
+ // subchannels for the same address instead of reusing existing ones because
+ // of different values of this channel arg.
+ const char* args_to_remove[] = {GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+ channel_args_ = grpc_channel_args_copy_and_remove(
+ args.args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove));
+ if (response_generator_ != nullptr) {
+ response_generator_->SetFakeResolver(Ref());
}
}
FakeResolver::~FakeResolver() { grpc_channel_args_destroy(channel_args_); }
void FakeResolver::StartLocked() {
- active_ = true;
+ started_ = true;
MaybeSendResultLocked();
}
if (!reresolution_closure_pending_) {
reresolution_closure_pending_ = true;
Ref().release(); // ref held by closure
- GRPC_CLOSURE_SCHED(&reresolution_closure_, GRPC_ERROR_NONE);
+ work_serializer()->Run([this]() { ReturnReresolutionResult(); },
+ DEBUG_LOCATION);
}
}
}
+void FakeResolver::ShutdownLocked() {
+ shutdown_ = true;
+ if (response_generator_ != nullptr) {
+ response_generator_->SetFakeResolver(nullptr);
+ response_generator_.reset();
+ }
+}
+
void FakeResolver::MaybeSendResultLocked() {
- if (!active_) return;
+ if (!started_ || shutdown_) return;
if (return_failure_) {
// TODO(roth): Change resolver result generator to be able to inject
// the error to be returned.
}
}
-void FakeResolver::ReturnReresolutionResult(void* arg, grpc_error* error) {
- FakeResolver* self = static_cast<FakeResolver*>(arg);
- self->reresolution_closure_pending_ = false;
- self->MaybeSendResultLocked();
- self->Unref();
+void FakeResolver::ReturnReresolutionResult() {
+ reresolution_closure_pending_ = false;
+ MaybeSendResultLocked();
+ Unref();
}
-//
-// FakeResolverResponseGenerator
-//
+class FakeResolverResponseSetter {
+ public:
+ explicit FakeResolverResponseSetter(RefCountedPtr<FakeResolver> resolver,
+ Resolver::Result result,
+ bool has_result = false,
+ bool immediate = true)
+ : resolver_(std::move(resolver)),
+ result_(std::move(result)),
+ has_result_(has_result),
+ immediate_(immediate) {}
+ void SetResponseLocked();
+ void SetReresolutionResponseLocked();
+ void SetFailureLocked();
-struct SetResponseClosureArg {
- grpc_closure set_response_closure;
- FakeResolverResponseGenerator* generator;
- Resolver::Result result;
- bool has_result = false;
- bool immediate = true;
+ private:
+ RefCountedPtr<FakeResolver> resolver_;
+ Resolver::Result result_;
+ bool has_result_;
+ bool immediate_;
};
-void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
- grpc_error* error) {
- SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
- FakeResolver* resolver = closure_arg->generator->resolver_;
- resolver->next_result_ = std::move(closure_arg->result);
- resolver->has_next_result_ = true;
- resolver->MaybeSendResultLocked();
- Delete(closure_arg);
+// Deletes object when done
+void FakeResolverResponseSetter::SetReresolutionResponseLocked() {
+ if (!resolver_->shutdown_) {
+ resolver_->reresolution_result_ = std::move(result_);
+ resolver_->has_reresolution_result_ = has_result_;
+ }
+ delete this;
}
-void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
- if (resolver_ != nullptr) {
- SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
- closure_arg->generator = this;
- closure_arg->result = std::move(result);
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked,
- closure_arg,
- grpc_combiner_scheduler(resolver_->combiner())),
- GRPC_ERROR_NONE);
- } else {
- GPR_ASSERT(!has_result_);
- has_result_ = true;
- result_ = std::move(result);
+// Deletes object when done
+void FakeResolverResponseSetter::SetResponseLocked() {
+ if (!resolver_->shutdown_) {
+ resolver_->next_result_ = std::move(result_);
+ resolver_->has_next_result_ = true;
+ resolver_->MaybeSendResultLocked();
+ }
+ delete this;
+}
+
+// Deletes object when done
+void FakeResolverResponseSetter::SetFailureLocked() {
+ if (!resolver_->shutdown_) {
+ resolver_->return_failure_ = true;
+ if (immediate_) resolver_->MaybeSendResultLocked();
}
+ delete this;
}
-void FakeResolverResponseGenerator::SetReresolutionResponseLocked(
- void* arg, grpc_error* error) {
- SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
- FakeResolver* resolver = closure_arg->generator->resolver_;
- resolver->reresolution_result_ = std::move(closure_arg->result);
- resolver->has_reresolution_result_ = closure_arg->has_result;
- Delete(closure_arg);
+//
+// FakeResolverResponseGenerator
+//
+
+FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
+
+FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
+
+void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
+ RefCountedPtr<FakeResolver> resolver;
+ {
+ MutexLock lock(&mu_);
+ if (resolver_ == nullptr) {
+ has_result_ = true;
+ result_ = std::move(result);
+ return;
+ }
+ resolver = resolver_->Ref();
+ }
+ FakeResolverResponseSetter* arg =
+ new FakeResolverResponseSetter(resolver, std::move(result));
+ resolver->work_serializer()->Run([arg]() { arg->SetResponseLocked(); },
+ DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetReresolutionResponse(
Resolver::Result result) {
- GPR_ASSERT(resolver_ != nullptr);
- SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
- closure_arg->generator = this;
- closure_arg->result = std::move(result);
- closure_arg->has_result = true;
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
- SetReresolutionResponseLocked, closure_arg,
- grpc_combiner_scheduler(resolver_->combiner())),
- GRPC_ERROR_NONE);
+ RefCountedPtr<FakeResolver> resolver;
+ {
+ MutexLock lock(&mu_);
+ GPR_ASSERT(resolver_ != nullptr);
+ resolver = resolver_->Ref();
+ }
+ FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
+ resolver, std::move(result), true /* has_result */);
+ resolver->work_serializer()->Run(
+ [arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
- GPR_ASSERT(resolver_ != nullptr);
- SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
- closure_arg->generator = this;
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
- SetReresolutionResponseLocked, closure_arg,
- grpc_combiner_scheduler(resolver_->combiner())),
- GRPC_ERROR_NONE);
-}
-
-void FakeResolverResponseGenerator::SetFailureLocked(void* arg,
- grpc_error* error) {
- SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
- FakeResolver* resolver = closure_arg->generator->resolver_;
- resolver->return_failure_ = true;
- if (closure_arg->immediate) resolver->MaybeSendResultLocked();
- Delete(closure_arg);
+ RefCountedPtr<FakeResolver> resolver;
+ {
+ MutexLock lock(&mu_);
+ GPR_ASSERT(resolver_ != nullptr);
+ resolver = resolver_->Ref();
+ }
+ FakeResolverResponseSetter* arg =
+ new FakeResolverResponseSetter(resolver, Resolver::Result());
+ resolver->work_serializer()->Run(
+ [arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetFailure() {
- GPR_ASSERT(resolver_ != nullptr);
- SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
- closure_arg->generator = this;
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
- closure_arg,
- grpc_combiner_scheduler(resolver_->combiner())),
- GRPC_ERROR_NONE);
+ RefCountedPtr<FakeResolver> resolver;
+ {
+ MutexLock lock(&mu_);
+ GPR_ASSERT(resolver_ != nullptr);
+ resolver = resolver_->Ref();
+ }
+ FakeResolverResponseSetter* arg =
+ new FakeResolverResponseSetter(resolver, Resolver::Result());
+ resolver->work_serializer()->Run([arg]() { arg->SetFailureLocked(); },
+ DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetFailureOnReresolution() {
- GPR_ASSERT(resolver_ != nullptr);
- SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
- closure_arg->generator = this;
- closure_arg->immediate = false;
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
- closure_arg,
- grpc_combiner_scheduler(resolver_->combiner())),
- GRPC_ERROR_NONE);
+ RefCountedPtr<FakeResolver> resolver;
+ {
+ MutexLock lock(&mu_);
+ GPR_ASSERT(resolver_ != nullptr);
+ resolver = resolver_->Ref();
+ }
+ FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
+ resolver, Resolver::Result(), false /* has_result */,
+ false /* immediate */);
+ resolver->work_serializer()->Run([arg]() { arg->SetFailureLocked(); },
+ DEBUG_LOCATION);
+}
+
+void FakeResolverResponseGenerator::SetFakeResolver(
+ RefCountedPtr<FakeResolver> resolver) {
+ MutexLock lock(&mu_);
+ resolver_ = std::move(resolver);
+ if (resolver_ == nullptr) return;
+ if (has_result_) {
+ FakeResolverResponseSetter* arg =
+ new FakeResolverResponseSetter(resolver_, std::move(result_));
+ resolver_->work_serializer()->Run([arg]() { arg->SetResponseLocked(); },
+ DEBUG_LOCATION);
+ has_result_ = false;
+ }
}
namespace {
return arg;
}
-FakeResolverResponseGenerator* FakeResolverResponseGenerator::GetFromArgs(
- const grpc_channel_args* args) {
+RefCountedPtr<FakeResolverResponseGenerator>
+FakeResolverResponseGenerator::GetFromArgs(const grpc_channel_args* args) {
const grpc_arg* arg =
grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return nullptr;
- return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p);
+ return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p)
+ ->Ref();
}
//
class FakeResolverFactory : public ResolverFactory {
public:
+ bool IsValidUri(const grpc_uri* /*uri*/) const override { return true; }
+
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
- return OrphanablePtr<Resolver>(New<FakeResolver>(std::move(args)));
+ return MakeOrphanable<FakeResolver>(std::move(args));
}
const char* scheme() const override { return "fake"; }
void grpc_resolver_fake_init() {
grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
- grpc_core::UniquePtr<grpc_core::ResolverFactory>(
- grpc_core::New<grpc_core::FakeResolverFactory>()));
+ absl::make_unique<grpc_core::FakeResolverFactory>());
}
void grpc_resolver_fake_shutdown() {}