Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / resolver / fake / fake_resolver.cc
index 85b9bea..005de55 100644 (file)
 #include <grpc/support/alloc.h>
 #include <grpc/support/string_util.h>
 
-#include "src/core/ext/filters/client_channel/parse_address.h"
 #include "src/core/ext/filters/client_channel/resolver_registry.h"
 #include "src/core/ext/filters/client_channel/server_address.h"
 #include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/gpr/host_port.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gpr/useful.h"
 #include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/parse_address.h"
 #include "src/core/lib/iomgr/resolve_address.h"
 #include "src/core/lib/iomgr/unix_sockets_posix.h"
+#include "src/core/lib/iomgr/work_serializer.h"
 #include "src/core/lib/slice/slice_internal.h"
 #include "src/core/lib/slice/slice_string_helpers.h"
 
@@ -58,17 +57,19 @@ class FakeResolver : public Resolver {
 
  private:
   friend class FakeResolverResponseGenerator;
+  friend class FakeResolverResponseSetter;
 
-  virtual ~FakeResolver();
+  ~FakeResolver() override;
 
-  void ShutdownLocked() override { active_ = false; }
+  void ShutdownLocked() override;
 
   void MaybeSendResultLocked();
 
-  static void ReturnReresolutionResult(void* arg, grpc_error* error);
+  void ReturnReresolutionResult();
 
   // passed-in parameters
   grpc_channel_args* channel_args_ = nullptr;
+  RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
   // If has_next_result_ is true, next_result_ is the next resolution result
   // to be returned.
   bool has_next_result_ = false;
@@ -77,35 +78,36 @@ class FakeResolver : public Resolver {
   // RequestReresolutionLocked().
   bool has_reresolution_result_ = false;
   Result reresolution_result_;
-  // True between the calls to StartLocked() ShutdownLocked().
-  bool active_ = false;
+  // True after the call to StartLocked().
+  bool started_ = false;
+  // True after the call to ShutdownLocked().
+  bool shutdown_ = false;
   // if true, return failure
   bool return_failure_ = false;
   // pending re-resolution
-  grpc_closure reresolution_closure_;
   bool reresolution_closure_pending_ = false;
 };
 
 FakeResolver::FakeResolver(ResolverArgs args)
-    : Resolver(args.combiner, std::move(args.result_handler)) {
-  GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
-                    grpc_combiner_scheduler(combiner()));
-  channel_args_ = grpc_channel_args_copy(args.args);
-  FakeResolverResponseGenerator* response_generator =
-      FakeResolverResponseGenerator::GetFromArgs(args.args);
-  if (response_generator != nullptr) {
-    response_generator->resolver_ = this;
-    if (response_generator->has_result_) {
-      response_generator->SetResponse(std::move(response_generator->result_));
-      response_generator->has_result_ = false;
-    }
+    : Resolver(std::move(args.work_serializer), std::move(args.result_handler)),
+      response_generator_(
+          FakeResolverResponseGenerator::GetFromArgs(args.args)) {
+  // Channels sharing the same subchannels may have different resolver response
+  // generators. If we don't remove this arg, subchannel pool will create new
+  // subchannels for the same address instead of reusing existing ones because
+  // of different values of this channel arg.
+  const char* args_to_remove[] = {GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+  channel_args_ = grpc_channel_args_copy_and_remove(
+      args.args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove));
+  if (response_generator_ != nullptr) {
+    response_generator_->SetFakeResolver(Ref());
   }
 }
 
 FakeResolver::~FakeResolver() { grpc_channel_args_destroy(channel_args_); }
 
 void FakeResolver::StartLocked() {
-  active_ = true;
+  started_ = true;
   MaybeSendResultLocked();
 }
 
@@ -119,13 +121,22 @@ void FakeResolver::RequestReresolutionLocked() {
     if (!reresolution_closure_pending_) {
       reresolution_closure_pending_ = true;
       Ref().release();  // ref held by closure
-      GRPC_CLOSURE_SCHED(&reresolution_closure_, GRPC_ERROR_NONE);
+      work_serializer()->Run([this]() { ReturnReresolutionResult(); },
+                             DEBUG_LOCATION);
     }
   }
 }
 
+void FakeResolver::ShutdownLocked() {
+  shutdown_ = true;
+  if (response_generator_ != nullptr) {
+    response_generator_->SetFakeResolver(nullptr);
+    response_generator_.reset();
+  }
+}
+
 void FakeResolver::MaybeSendResultLocked() {
-  if (!active_) return;
+  if (!started_ || shutdown_) return;
   if (return_failure_) {
     // TODO(roth): Change resolver result generator to be able to inject
     // the error to be returned.
@@ -149,116 +160,152 @@ void FakeResolver::MaybeSendResultLocked() {
   }
 }
 
-void FakeResolver::ReturnReresolutionResult(void* arg, grpc_error* error) {
-  FakeResolver* self = static_cast<FakeResolver*>(arg);
-  self->reresolution_closure_pending_ = false;
-  self->MaybeSendResultLocked();
-  self->Unref();
+void FakeResolver::ReturnReresolutionResult() {
+  reresolution_closure_pending_ = false;
+  MaybeSendResultLocked();
+  Unref();
 }
 
-//
-// FakeResolverResponseGenerator
-//
+class FakeResolverResponseSetter {
+ public:
+  explicit FakeResolverResponseSetter(RefCountedPtr<FakeResolver> resolver,
+                                      Resolver::Result result,
+                                      bool has_result = false,
+                                      bool immediate = true)
+      : resolver_(std::move(resolver)),
+        result_(std::move(result)),
+        has_result_(has_result),
+        immediate_(immediate) {}
+  void SetResponseLocked();
+  void SetReresolutionResponseLocked();
+  void SetFailureLocked();
 
-struct SetResponseClosureArg {
-  grpc_closure set_response_closure;
-  FakeResolverResponseGenerator* generator;
-  Resolver::Result result;
-  bool has_result = false;
-  bool immediate = true;
+ private:
+  RefCountedPtr<FakeResolver> resolver_;
+  Resolver::Result result_;
+  bool has_result_;
+  bool immediate_;
 };
 
-void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
-                                                      grpc_error* error) {
-  SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
-  FakeResolver* resolver = closure_arg->generator->resolver_;
-  resolver->next_result_ = std::move(closure_arg->result);
-  resolver->has_next_result_ = true;
-  resolver->MaybeSendResultLocked();
-  Delete(closure_arg);
+// Deletes object when done
+void FakeResolverResponseSetter::SetReresolutionResponseLocked() {
+  if (!resolver_->shutdown_) {
+    resolver_->reresolution_result_ = std::move(result_);
+    resolver_->has_reresolution_result_ = has_result_;
+  }
+  delete this;
 }
 
-void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
-  if (resolver_ != nullptr) {
-    SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
-    closure_arg->generator = this;
-    closure_arg->result = std::move(result);
-    GRPC_CLOSURE_SCHED(
-        GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked,
-                          closure_arg,
-                          grpc_combiner_scheduler(resolver_->combiner())),
-        GRPC_ERROR_NONE);
-  } else {
-    GPR_ASSERT(!has_result_);
-    has_result_ = true;
-    result_ = std::move(result);
+// Deletes object when done
+void FakeResolverResponseSetter::SetResponseLocked() {
+  if (!resolver_->shutdown_) {
+    resolver_->next_result_ = std::move(result_);
+    resolver_->has_next_result_ = true;
+    resolver_->MaybeSendResultLocked();
+  }
+  delete this;
+}
+
+// Deletes object when done
+void FakeResolverResponseSetter::SetFailureLocked() {
+  if (!resolver_->shutdown_) {
+    resolver_->return_failure_ = true;
+    if (immediate_) resolver_->MaybeSendResultLocked();
   }
+  delete this;
 }
 
-void FakeResolverResponseGenerator::SetReresolutionResponseLocked(
-    void* arg, grpc_error* error) {
-  SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
-  FakeResolver* resolver = closure_arg->generator->resolver_;
-  resolver->reresolution_result_ = std::move(closure_arg->result);
-  resolver->has_reresolution_result_ = closure_arg->has_result;
-  Delete(closure_arg);
+//
+// FakeResolverResponseGenerator
+//
+
+FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
+
+FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
+
+void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
+  RefCountedPtr<FakeResolver> resolver;
+  {
+    MutexLock lock(&mu_);
+    if (resolver_ == nullptr) {
+      has_result_ = true;
+      result_ = std::move(result);
+      return;
+    }
+    resolver = resolver_->Ref();
+  }
+  FakeResolverResponseSetter* arg =
+      new FakeResolverResponseSetter(resolver, std::move(result));
+  resolver->work_serializer()->Run([arg]() { arg->SetResponseLocked(); },
+                                   DEBUG_LOCATION);
 }
 
 void FakeResolverResponseGenerator::SetReresolutionResponse(
     Resolver::Result result) {
-  GPR_ASSERT(resolver_ != nullptr);
-  SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
-  closure_arg->generator = this;
-  closure_arg->result = std::move(result);
-  closure_arg->has_result = true;
-  GRPC_CLOSURE_SCHED(
-      GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
-                        SetReresolutionResponseLocked, closure_arg,
-                        grpc_combiner_scheduler(resolver_->combiner())),
-      GRPC_ERROR_NONE);
+  RefCountedPtr<FakeResolver> resolver;
+  {
+    MutexLock lock(&mu_);
+    GPR_ASSERT(resolver_ != nullptr);
+    resolver = resolver_->Ref();
+  }
+  FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
+      resolver, std::move(result), true /* has_result */);
+  resolver->work_serializer()->Run(
+      [arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION);
 }
 
 void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
-  GPR_ASSERT(resolver_ != nullptr);
-  SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
-  closure_arg->generator = this;
-  GRPC_CLOSURE_SCHED(
-      GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
-                        SetReresolutionResponseLocked, closure_arg,
-                        grpc_combiner_scheduler(resolver_->combiner())),
-      GRPC_ERROR_NONE);
-}
-
-void FakeResolverResponseGenerator::SetFailureLocked(void* arg,
-                                                     grpc_error* error) {
-  SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
-  FakeResolver* resolver = closure_arg->generator->resolver_;
-  resolver->return_failure_ = true;
-  if (closure_arg->immediate) resolver->MaybeSendResultLocked();
-  Delete(closure_arg);
+  RefCountedPtr<FakeResolver> resolver;
+  {
+    MutexLock lock(&mu_);
+    GPR_ASSERT(resolver_ != nullptr);
+    resolver = resolver_->Ref();
+  }
+  FakeResolverResponseSetter* arg =
+      new FakeResolverResponseSetter(resolver, Resolver::Result());
+  resolver->work_serializer()->Run(
+      [arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION);
 }
 
 void FakeResolverResponseGenerator::SetFailure() {
-  GPR_ASSERT(resolver_ != nullptr);
-  SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
-  closure_arg->generator = this;
-  GRPC_CLOSURE_SCHED(
-      GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
-                        closure_arg,
-                        grpc_combiner_scheduler(resolver_->combiner())),
-      GRPC_ERROR_NONE);
+  RefCountedPtr<FakeResolver> resolver;
+  {
+    MutexLock lock(&mu_);
+    GPR_ASSERT(resolver_ != nullptr);
+    resolver = resolver_->Ref();
+  }
+  FakeResolverResponseSetter* arg =
+      new FakeResolverResponseSetter(resolver, Resolver::Result());
+  resolver->work_serializer()->Run([arg]() { arg->SetFailureLocked(); },
+                                   DEBUG_LOCATION);
 }
 
 void FakeResolverResponseGenerator::SetFailureOnReresolution() {
-  GPR_ASSERT(resolver_ != nullptr);
-  SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
-  closure_arg->generator = this;
-  closure_arg->immediate = false;
-  GRPC_CLOSURE_SCHED(
-      GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
-                        closure_arg,
-                        grpc_combiner_scheduler(resolver_->combiner())),
-      GRPC_ERROR_NONE);
+  RefCountedPtr<FakeResolver> resolver;
+  {
+    MutexLock lock(&mu_);
+    GPR_ASSERT(resolver_ != nullptr);
+    resolver = resolver_->Ref();
+  }
+  FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
+      resolver, Resolver::Result(), false /* has_result */,
+      false /* immediate */);
+  resolver->work_serializer()->Run([arg]() { arg->SetFailureLocked(); },
+                                   DEBUG_LOCATION);
+}
+
+void FakeResolverResponseGenerator::SetFakeResolver(
+    RefCountedPtr<FakeResolver> resolver) {
+  MutexLock lock(&mu_);
+  resolver_ = std::move(resolver);
+  if (resolver_ == nullptr) return;
+  if (has_result_) {
+    FakeResolverResponseSetter* arg =
+        new FakeResolverResponseSetter(resolver_, std::move(result_));
+    resolver_->work_serializer()->Run([arg]() { arg->SetResponseLocked(); },
+                                      DEBUG_LOCATION);
+    has_result_ = false;
+  }
 }
 
 namespace {
@@ -298,12 +345,13 @@ grpc_arg FakeResolverResponseGenerator::MakeChannelArg(
   return arg;
 }
 
-FakeResolverResponseGenerator* FakeResolverResponseGenerator::GetFromArgs(
-    const grpc_channel_args* args) {
+RefCountedPtr<FakeResolverResponseGenerator>
+FakeResolverResponseGenerator::GetFromArgs(const grpc_channel_args* args) {
   const grpc_arg* arg =
       grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
   if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return nullptr;
-  return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p);
+  return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p)
+      ->Ref();
 }
 
 //
@@ -314,8 +362,10 @@ namespace {
 
 class FakeResolverFactory : public ResolverFactory {
  public:
+  bool IsValidUri(const grpc_uri* /*uri*/) const override { return true; }
+
   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
-    return OrphanablePtr<Resolver>(New<FakeResolver>(std::move(args)));
+    return MakeOrphanable<FakeResolver>(std::move(args));
   }
 
   const char* scheme() const override { return "fake"; }
@@ -327,8 +377,7 @@ class FakeResolverFactory : public ResolverFactory {
 
 void grpc_resolver_fake_init() {
   grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
-      grpc_core::UniquePtr<grpc_core::ResolverFactory>(
-          grpc_core::New<grpc_core::FakeResolverFactory>()));
+      absl::make_unique<grpc_core::FakeResolverFactory>());
 }
 
 void grpc_resolver_fake_shutdown() {}