- add sources.
[platform/framework/web/crosswalk.git] / src / net / proxy / multi_threaded_proxy_resolver.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/metrics/histogram.h"
11 #include "base/strings/string_util.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/threading/thread.h"
14 #include "base/threading/thread_restrictions.h"
15 #include "net/base/net_errors.h"
16 #include "net/base/net_log.h"
17 #include "net/proxy/proxy_info.h"
18
19 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
20 //               data when SetPacScript fails. That will reclaim memory when
21 //               testing bogus scripts.
22
23 namespace net {
24
25 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
26 // thread and a synchronous ProxyResolver (which will be operated on said
27 // thread.)
28 class MultiThreadedProxyResolver::Executor
29     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
30  public:
31   // |coordinator| must remain valid throughout our lifetime. It is used to
32   // signal when the executor is ready to receive work by calling
33   // |coordinator->OnExecutorReady()|.
34   // The constructor takes ownership of |resolver|.
35   // |thread_number| is an identifier used when naming the worker thread.
36   Executor(MultiThreadedProxyResolver* coordinator,
37            ProxyResolver* resolver,
38            int thread_number);
39
40   // Submit a job to this executor.
41   void StartJob(Job* job);
42
43   // Callback for when a job has completed running on the executor's thread.
44   void OnJobCompleted(Job* job);
45
46   // Cleanup the executor. Cancels all outstanding work, and frees the thread
47   // and resolver.
48   void Destroy();
49
50   void PurgeMemory();
51
52   // Returns the outstanding job, or NULL.
53   Job* outstanding_job() const { return outstanding_job_.get(); }
54
55   ProxyResolver* resolver() { return resolver_.get(); }
56
57   int thread_number() const { return thread_number_; }
58
59  private:
60   friend class base::RefCountedThreadSafe<Executor>;
61   ~Executor();
62
63   MultiThreadedProxyResolver* coordinator_;
64   const int thread_number_;
65
66   // The currently active job for this executor (either a SetPacScript or
67   // GetProxyForURL task).
68   scoped_refptr<Job> outstanding_job_;
69
70   // The synchronous resolver implementation.
71   scoped_ptr<ProxyResolver> resolver_;
72
73   // The thread where |resolver_| is run on.
74   // Note that declaration ordering is important here. |thread_| needs to be
75   // destroyed *before* |resolver_|, in case |resolver_| is currently
76   // executing on |thread_|.
77   scoped_ptr<base::Thread> thread_;
78 };
79
80 // MultiThreadedProxyResolver::Job ---------------------------------------------
81
82 class MultiThreadedProxyResolver::Job
83     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
84  public:
85   // Identifies the subclass of Job (only being used for debugging purposes).
86   enum Type {
87     TYPE_GET_PROXY_FOR_URL,
88     TYPE_SET_PAC_SCRIPT,
89     TYPE_SET_PAC_SCRIPT_INTERNAL,
90   };
91
92   Job(Type type, const CompletionCallback& callback)
93       : type_(type),
94         callback_(callback),
95         executor_(NULL),
96         was_cancelled_(false) {
97   }
98
99   void set_executor(Executor* executor) {
100     executor_ = executor;
101   }
102
103   // The "executor" is the job runner that is scheduling this job. If
104   // this job has not been submitted to an executor yet, this will be
105   // NULL (and we know it hasn't started yet).
106   Executor* executor() {
107     return executor_;
108   }
109
110   // Mark the job as having been cancelled.
111   void Cancel() {
112     was_cancelled_ = true;
113   }
114
115   // Returns true if Cancel() has been called.
116   bool was_cancelled() const { return was_cancelled_; }
117
118   Type type() const { return type_; }
119
120   // Returns true if this job still has a user callback. Some jobs
121   // do not have a user callback, because they were helper jobs
122   // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
123   //
124   // Otherwise jobs that correspond with user-initiated work will
125   // have a non-null callback up until the callback is run.
126   bool has_user_callback() const { return !callback_.is_null(); }
127
128   // This method is called when the job is inserted into a wait queue
129   // because no executors were ready to accept it.
130   virtual void WaitingForThread() {}
131
132   // This method is called just before the job is posted to the work thread.
133   virtual void FinishedWaitingForThread() {}
134
135   // This method is called on the worker thread to do the job's work. On
136   // completion, implementors are expected to call OnJobCompleted() on
137   // |origin_loop|.
138   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
139
140  protected:
141   void OnJobCompleted() {
142     // |executor_| will be NULL if the executor has already been deleted.
143     if (executor_)
144       executor_->OnJobCompleted(this);
145   }
146
147   void RunUserCallback(int result) {
148     DCHECK(has_user_callback());
149     CompletionCallback callback = callback_;
150     // Reset the callback so has_user_callback() will now return false.
151     callback_.Reset();
152     callback.Run(result);
153   }
154
155   friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
156
157   virtual ~Job() {}
158
159  private:
160   const Type type_;
161   CompletionCallback callback_;
162   Executor* executor_;
163   bool was_cancelled_;
164 };
165
166 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
167
168 // Runs on the worker thread to call ProxyResolver::SetPacScript.
169 class MultiThreadedProxyResolver::SetPacScriptJob
170     : public MultiThreadedProxyResolver::Job {
171  public:
172   SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
173                   const CompletionCallback& callback)
174     : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
175                                 TYPE_SET_PAC_SCRIPT_INTERNAL,
176           callback),
177       script_data_(script_data) {
178   }
179
180   // Runs on the worker thread.
181   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
182     ProxyResolver* resolver = executor()->resolver();
183     int rv = resolver->SetPacScript(script_data_, CompletionCallback());
184
185     DCHECK_NE(rv, ERR_IO_PENDING);
186     origin_loop->PostTask(
187         FROM_HERE,
188         base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
189   }
190
191  protected:
192   virtual ~SetPacScriptJob() {}
193
194  private:
195   // Runs the completion callback on the origin thread.
196   void RequestComplete(int result_code) {
197     // The task may have been cancelled after it was started.
198     if (!was_cancelled() && has_user_callback()) {
199       RunUserCallback(result_code);
200     }
201     OnJobCompleted();
202   }
203
204   const scoped_refptr<ProxyResolverScriptData> script_data_;
205 };
206
207 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
208
209 class MultiThreadedProxyResolver::GetProxyForURLJob
210     : public MultiThreadedProxyResolver::Job {
211  public:
212   // |url|         -- the URL of the query.
213   // |results|     -- the structure to fill with proxy resolve results.
214   GetProxyForURLJob(const GURL& url,
215                     ProxyInfo* results,
216                     const CompletionCallback& callback,
217                     const BoundNetLog& net_log)
218       : Job(TYPE_GET_PROXY_FOR_URL, callback),
219         results_(results),
220         net_log_(net_log),
221         url_(url),
222         was_waiting_for_thread_(false) {
223     DCHECK(!callback.is_null());
224     start_time_ = base::TimeTicks::Now();
225   }
226
227   BoundNetLog* net_log() { return &net_log_; }
228
229   virtual void WaitingForThread() OVERRIDE {
230     was_waiting_for_thread_ = true;
231     net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
232   }
233
234   virtual void FinishedWaitingForThread() OVERRIDE {
235     DCHECK(executor());
236
237     submitted_to_thread_time_ = base::TimeTicks::Now();
238
239     if (was_waiting_for_thread_) {
240       net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
241     }
242
243     net_log_.AddEvent(
244         NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
245         NetLog::IntegerCallback("thread_number", executor()->thread_number()));
246   }
247
248   // Runs on the worker thread.
249   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
250     ProxyResolver* resolver = executor()->resolver();
251     int rv = resolver->GetProxyForURL(
252         url_, &results_buf_, CompletionCallback(), NULL, net_log_);
253     DCHECK_NE(rv, ERR_IO_PENDING);
254
255     origin_loop->PostTask(
256         FROM_HERE,
257         base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
258   }
259
260  protected:
261   virtual ~GetProxyForURLJob() {}
262
263  private:
264   // Runs the completion callback on the origin thread.
265   void QueryComplete(int result_code) {
266     // The Job may have been cancelled after it was started.
267     if (!was_cancelled()) {
268       RecordPerformanceMetrics();
269       if (result_code >= OK) {  // Note: unit-tests use values > 0.
270         results_->Use(results_buf_);
271       }
272       RunUserCallback(result_code);
273     }
274     OnJobCompleted();
275   }
276
277   void RecordPerformanceMetrics() {
278     DCHECK(!was_cancelled());
279
280     base::TimeTicks now = base::TimeTicks::Now();
281
282     // Log the total time the request took to complete.
283     UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Time",
284                                now - start_time_);
285
286     // Log the time the request was stalled waiting for a thread to free up.
287     UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Thread_Wait_Time",
288                                submitted_to_thread_time_ - start_time_);
289   }
290
291   // Must only be used on the "origin" thread.
292   ProxyInfo* results_;
293
294   // Can be used on either "origin" or worker thread.
295   BoundNetLog net_log_;
296   const GURL url_;
297
298   // Usable from within DoQuery on the worker thread.
299   ProxyInfo results_buf_;
300
301   base::TimeTicks start_time_;
302   base::TimeTicks submitted_to_thread_time_;
303
304   bool was_waiting_for_thread_;
305 };
306
307 // MultiThreadedProxyResolver::Executor ----------------------------------------
308
309 MultiThreadedProxyResolver::Executor::Executor(
310     MultiThreadedProxyResolver* coordinator,
311     ProxyResolver* resolver,
312     int thread_number)
313     : coordinator_(coordinator),
314       thread_number_(thread_number),
315       resolver_(resolver) {
316   DCHECK(coordinator);
317   DCHECK(resolver);
318   // Start up the thread.
319   // Note that it is safe to pass a temporary C-String to Thread(), as it will
320   // make a copy.
321   std::string thread_name =
322       base::StringPrintf("PAC thread #%d", thread_number);
323   thread_.reset(new base::Thread(thread_name.c_str()));
324   CHECK(thread_->Start());
325 }
326
327 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
328   DCHECK(!outstanding_job_.get());
329   outstanding_job_ = job;
330
331   // Run the job. Once it has completed (regardless of whether it was
332   // cancelled), it will invoke OnJobCompleted() on this thread.
333   job->set_executor(this);
334   job->FinishedWaitingForThread();
335   thread_->message_loop()->PostTask(
336       FROM_HERE,
337       base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
338 }
339
340 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
341   DCHECK_EQ(job, outstanding_job_.get());
342   outstanding_job_ = NULL;
343   coordinator_->OnExecutorReady(this);
344 }
345
346 void MultiThreadedProxyResolver::Executor::Destroy() {
347   DCHECK(coordinator_);
348
349   {
350     // See http://crbug.com/69710.
351     base::ThreadRestrictions::ScopedAllowIO allow_io;
352
353     // Join the worker thread.
354     thread_.reset();
355   }
356
357   // Cancel any outstanding job.
358   if (outstanding_job_.get()) {
359     outstanding_job_->Cancel();
360     // Orphan the job (since this executor may be deleted soon).
361     outstanding_job_->set_executor(NULL);
362   }
363
364   // It is now safe to free the ProxyResolver, since all the tasks that
365   // were using it on the resolver thread have completed.
366   resolver_.reset();
367
368   // Null some stuff as a precaution.
369   coordinator_ = NULL;
370   outstanding_job_ = NULL;
371 }
372
373 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
374   thread_->message_loop()->PostTask(
375       FROM_HERE,
376       base::Bind(&ProxyResolver::PurgeMemory,
377                  base::Unretained(resolver_.get())));
378 }
379
380 MultiThreadedProxyResolver::Executor::~Executor() {
381   // The important cleanup happens as part of Destroy(), which should always be
382   // called first.
383   DCHECK(!coordinator_) << "Destroy() was not called";
384   DCHECK(!thread_.get());
385   DCHECK(!resolver_.get());
386   DCHECK(!outstanding_job_.get());
387 }
388
389 // MultiThreadedProxyResolver --------------------------------------------------
390
391 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
392     ProxyResolverFactory* resolver_factory,
393     size_t max_num_threads)
394     : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
395       resolver_factory_(resolver_factory),
396       max_num_threads_(max_num_threads) {
397   DCHECK_GE(max_num_threads, 1u);
398 }
399
400 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
401   // We will cancel all outstanding requests.
402   pending_jobs_.clear();
403   ReleaseAllExecutors();
404 }
405
406 int MultiThreadedProxyResolver::GetProxyForURL(
407     const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
408     RequestHandle* request, const BoundNetLog& net_log) {
409   DCHECK(CalledOnValidThread());
410   DCHECK(!callback.is_null());
411   DCHECK(current_script_data_.get())
412       << "Resolver is un-initialized. Must call SetPacScript() first!";
413
414   scoped_refptr<GetProxyForURLJob> job(
415       new GetProxyForURLJob(url, results, callback, net_log));
416
417   // Completion will be notified through |callback|, unless the caller cancels
418   // the request using |request|.
419   if (request)
420     *request = reinterpret_cast<RequestHandle>(job.get());
421
422   // If there is an executor that is ready to run this request, submit it!
423   Executor* executor = FindIdleExecutor();
424   if (executor) {
425     DCHECK_EQ(0u, pending_jobs_.size());
426     executor->StartJob(job.get());
427     return ERR_IO_PENDING;
428   }
429
430   // Otherwise queue this request. (We will schedule it to a thread once one
431   // becomes available).
432   job->WaitingForThread();
433   pending_jobs_.push_back(job);
434
435   // If we haven't already reached the thread limit, provision a new thread to
436   // drain the requests more quickly.
437   if (executors_.size() < max_num_threads_) {
438     executor = AddNewExecutor();
439     executor->StartJob(
440         new SetPacScriptJob(current_script_data_, CompletionCallback()));
441   }
442
443   return ERR_IO_PENDING;
444 }
445
446 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
447   DCHECK(CalledOnValidThread());
448   DCHECK(req);
449
450   Job* job = reinterpret_cast<Job*>(req);
451   DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
452
453   if (job->executor()) {
454     // If the job was already submitted to the executor, just mark it
455     // as cancelled so the user callback isn't run on completion.
456     job->Cancel();
457   } else {
458     // Otherwise the job is just sitting in a queue.
459     PendingJobsQueue::iterator it =
460         std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
461     DCHECK(it != pending_jobs_.end());
462     pending_jobs_.erase(it);
463   }
464 }
465
466 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
467   DCHECK(CalledOnValidThread());
468   DCHECK(req);
469   return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
470 }
471
472 void MultiThreadedProxyResolver::CancelSetPacScript() {
473   DCHECK(CalledOnValidThread());
474   DCHECK_EQ(0u, pending_jobs_.size());
475   DCHECK_EQ(1u, executors_.size());
476   DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
477             executors_[0]->outstanding_job()->type());
478
479   // Defensively clear some data which shouldn't be getting used
480   // anymore.
481   current_script_data_ = NULL;
482
483   ReleaseAllExecutors();
484 }
485
486 void MultiThreadedProxyResolver::PurgeMemory() {
487   DCHECK(CalledOnValidThread());
488   for (ExecutorList::iterator it = executors_.begin();
489        it != executors_.end(); ++it) {
490     Executor* executor = it->get();
491     executor->PurgeMemory();
492   }
493 }
494
495 int MultiThreadedProxyResolver::SetPacScript(
496     const scoped_refptr<ProxyResolverScriptData>& script_data,
497     const CompletionCallback&callback) {
498   DCHECK(CalledOnValidThread());
499   DCHECK(!callback.is_null());
500
501   // Save the script details, so we can provision new executors later.
502   current_script_data_ = script_data;
503
504   // The user should not have any outstanding requests when they call
505   // SetPacScript().
506   CheckNoOutstandingUserRequests();
507
508   // Destroy all of the current threads and their proxy resolvers.
509   ReleaseAllExecutors();
510
511   // Provision a new executor, and run the SetPacScript request. On completion
512   // notification will be sent through |callback|.
513   Executor* executor = AddNewExecutor();
514   executor->StartJob(new SetPacScriptJob(script_data, callback));
515   return ERR_IO_PENDING;
516 }
517
518 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
519   DCHECK(CalledOnValidThread());
520   CHECK_EQ(0u, pending_jobs_.size());
521
522   for (ExecutorList::const_iterator it = executors_.begin();
523        it != executors_.end(); ++it) {
524     const Executor* executor = it->get();
525     Job* job = executor->outstanding_job();
526     // The "has_user_callback()" is to exclude jobs for which the callback
527     // has already been invoked, or was not user-initiated (as in the case of
528     // lazy thread provisions). User-initiated jobs may !has_user_callback()
529     // when the callback has already been run. (Since we only clear the
530     // outstanding job AFTER the callback has been invoked, it is possible
531     // for a new request to be started from within the callback).
532     CHECK(!job || job->was_cancelled() || !job->has_user_callback());
533   }
534 }
535
536 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
537   DCHECK(CalledOnValidThread());
538   for (ExecutorList::iterator it = executors_.begin();
539        it != executors_.end(); ++it) {
540     Executor* executor = it->get();
541     executor->Destroy();
542   }
543   executors_.clear();
544 }
545
546 MultiThreadedProxyResolver::Executor*
547 MultiThreadedProxyResolver::FindIdleExecutor() {
548   DCHECK(CalledOnValidThread());
549   for (ExecutorList::iterator it = executors_.begin();
550        it != executors_.end(); ++it) {
551     Executor* executor = it->get();
552     if (!executor->outstanding_job())
553       return executor;
554   }
555   return NULL;
556 }
557
558 MultiThreadedProxyResolver::Executor*
559 MultiThreadedProxyResolver::AddNewExecutor() {
560   DCHECK(CalledOnValidThread());
561   DCHECK_LT(executors_.size(), max_num_threads_);
562   // The "thread number" is used to give the thread a unique name.
563   int thread_number = executors_.size();
564   ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
565   Executor* executor = new Executor(
566       this, resolver, thread_number);
567   executors_.push_back(make_scoped_refptr(executor));
568   return executor;
569 }
570
571 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
572   DCHECK(CalledOnValidThread());
573   if (pending_jobs_.empty())
574     return;
575
576   // Get the next job to process (FIFO). Transfer it from the pending queue
577   // to the executor.
578   scoped_refptr<Job> job = pending_jobs_.front();
579   pending_jobs_.pop_front();
580   executor->StartJob(job.get());
581 }
582
583 }  // namespace net