Upstream version 5.34.104.0
[platform/framework/web/crosswalk.git] / src / chrome / browser / chromeos / drive / job_scheduler.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
6
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/drive/event_logger.h"
14 #include "chrome/common/pref_names.h"
15 #include "content/public/browser/browser_thread.h"
16 #include "google_apis/drive/drive_api_parser.h"
17
18 using content::BrowserThread;
19
20 namespace drive {
21
22 namespace {
23
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error.  The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
27 //
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount = 4;
33 const int kMaxRetryCount = 2 * kMaxThrottleCount;
34
35 // GetDefaultValue returns a value constructed by the default constructor.
36 template<typename T> struct DefaultValueCreator {
37   static T GetDefaultValue() { return T(); }
38 };
39 template<typename T> struct DefaultValueCreator<const T&> {
40   static T GetDefaultValue() { return T(); }
41 };
42
43 // Helper of CreateErrorRunCallback implementation.
44 // Provides:
45 // - ResultType; the type of the Callback which should be returned by
46 //     CreateErrorRunCallback.
47 // - Run(): a static function which takes the original |callback| and |error|,
48 //     and runs the |callback|.Run() with the error code and default values
49 //     for remaining arguments.
50 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
51
52 // CreateErrorRunCallback with two arguments.
53 template<typename P1>
54 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
55   static void Run(
56       const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
57       google_apis::GDataErrorCode error) {
58     callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
59   }
60 };
61
62 // Returns a callback with the tail parameter bound to its default value.
63 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
64 template<typename CallbackType>
65 base::Callback<void(google_apis::GDataErrorCode)>
66 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
67   return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
68 }
69
70 // Parameter struct for RunUploadNewFile.
71 struct UploadNewFileParams {
72   std::string parent_resource_id;
73   base::FilePath local_file_path;
74   std::string title;
75   std::string content_type;
76   DriveUploader::UploadNewFileOptions options;
77   UploadCompletionCallback callback;
78   google_apis::ProgressCallback progress_callback;
79 };
80
81 // Helper function to work around the arity limitation of base::Bind.
82 google_apis::CancelCallback RunUploadNewFile(
83     DriveUploaderInterface* uploader,
84     const UploadNewFileParams& params) {
85   return uploader->UploadNewFile(params.parent_resource_id,
86                                  params.local_file_path,
87                                  params.title,
88                                  params.content_type,
89                                  params.options,
90                                  params.callback,
91                                  params.progress_callback);
92 }
93
94 // Parameter struct for RunUploadExistingFile.
95 struct UploadExistingFileParams {
96   std::string resource_id;
97   base::FilePath local_file_path;
98   std::string content_type;
99   DriveUploader::UploadExistingFileOptions options;
100   std::string etag;
101   UploadCompletionCallback callback;
102   google_apis::ProgressCallback progress_callback;
103 };
104
105 // Helper function to work around the arity limitation of base::Bind.
106 google_apis::CancelCallback RunUploadExistingFile(
107     DriveUploaderInterface* uploader,
108     const UploadExistingFileParams& params) {
109   return uploader->UploadExistingFile(params.resource_id,
110                                       params.local_file_path,
111                                       params.content_type,
112                                       params.options,
113                                       params.callback,
114                                       params.progress_callback);
115 }
116
117 // Parameter struct for RunResumeUploadFile.
118 struct ResumeUploadFileParams {
119   GURL upload_location;
120   base::FilePath local_file_path;
121   std::string content_type;
122   UploadCompletionCallback callback;
123   google_apis::ProgressCallback progress_callback;
124 };
125
126 // Helper function to adjust the return type.
127 google_apis::CancelCallback RunResumeUploadFile(
128     DriveUploaderInterface* uploader,
129     const ResumeUploadFileParams& params) {
130   return uploader->ResumeUploadFile(params.upload_location,
131                                     params.local_file_path,
132                                     params.content_type,
133                                     params.callback,
134                                     params.progress_callback);
135 }
136
137 }  // namespace
138
139 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
140 const int JobScheduler::kMaxJobCount[] = {
141   5,  // METADATA_QUEUE
142   1,  // FILE_QUEUE
143 };
144
145 JobScheduler::JobEntry::JobEntry(JobType type)
146     : job_info(type),
147       context(ClientContext(USER_INITIATED)),
148       retry_count(0) {
149   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
150 }
151
152 JobScheduler::JobEntry::~JobEntry() {
153   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
154 }
155
156 struct JobScheduler::ResumeUploadParams {
157   base::FilePath drive_file_path;
158   base::FilePath local_file_path;
159   std::string content_type;
160 };
161
162 JobScheduler::JobScheduler(
163     PrefService* pref_service,
164     EventLogger* logger,
165     DriveServiceInterface* drive_service,
166     base::SequencedTaskRunner* blocking_task_runner)
167     : throttle_count_(0),
168       wait_until_(base::Time::Now()),
169       disable_throttling_(false),
170       logger_(logger),
171       drive_service_(drive_service),
172       uploader_(new DriveUploader(drive_service, blocking_task_runner)),
173       pref_service_(pref_service),
174       weak_ptr_factory_(this) {
175   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
176
177   for (int i = 0; i < NUM_QUEUES; ++i)
178     queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
179
180   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
181 }
182
183 JobScheduler::~JobScheduler() {
184   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
185
186   size_t num_queued_jobs = 0;
187   for (int i = 0; i < NUM_QUEUES; ++i)
188     num_queued_jobs += queue_[i]->GetNumberOfJobs();
189   DCHECK_EQ(num_queued_jobs, job_map_.size());
190
191   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
192 }
193
194 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
195   std::vector<JobInfo> job_info_list;
196   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
197     job_info_list.push_back(iter.GetCurrentValue()->job_info);
198   return job_info_list;
199 }
200
201 void JobScheduler::AddObserver(JobListObserver* observer) {
202   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
203   observer_list_.AddObserver(observer);
204 }
205
206 void JobScheduler::RemoveObserver(JobListObserver* observer) {
207   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
208   observer_list_.RemoveObserver(observer);
209 }
210
211 void JobScheduler::CancelJob(JobID job_id) {
212   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
213
214   JobEntry* job = job_map_.Lookup(job_id);
215   if (job) {
216     if (job->job_info.state == STATE_RUNNING) {
217       // If the job is running an HTTP request, cancel it via |cancel_callback|
218       // returned from the request, and wait for termination in the normal
219       // callback handler, OnJobDone.
220       if (!job->cancel_callback.is_null())
221         job->cancel_callback.Run();
222     } else {
223       AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
224     }
225   }
226 }
227
228 void JobScheduler::CancelAllJobs() {
229   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
230
231   // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
232   // removable during iteration.
233   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
234     CancelJob(iter.GetCurrentKey());
235 }
236
237 void JobScheduler::GetAboutResource(
238     const google_apis::AboutResourceCallback& callback) {
239   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
240   DCHECK(!callback.is_null());
241
242   JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
243   new_job->task = base::Bind(
244       &DriveServiceInterface::GetAboutResource,
245       base::Unretained(drive_service_),
246       base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
247                  weak_ptr_factory_.GetWeakPtr(),
248                  new_job->job_info.job_id,
249                  callback));
250   new_job->abort_callback = CreateErrorRunCallback(callback);
251   StartJob(new_job);
252 }
253
254 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
255   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
256   DCHECK(!callback.is_null());
257
258   JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
259   new_job->task = base::Bind(
260       &DriveServiceInterface::GetAppList,
261       base::Unretained(drive_service_),
262       base::Bind(&JobScheduler::OnGetAppListJobDone,
263                  weak_ptr_factory_.GetWeakPtr(),
264                  new_job->job_info.job_id,
265                  callback));
266   new_job->abort_callback = CreateErrorRunCallback(callback);
267   StartJob(new_job);
268 }
269
270 void JobScheduler::GetAllResourceList(
271     const google_apis::GetResourceListCallback& callback) {
272   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
273   DCHECK(!callback.is_null());
274
275   JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
276   new_job->task = base::Bind(
277       &DriveServiceInterface::GetAllResourceList,
278       base::Unretained(drive_service_),
279       base::Bind(&JobScheduler::OnGetResourceListJobDone,
280                  weak_ptr_factory_.GetWeakPtr(),
281                  new_job->job_info.job_id,
282                  callback));
283   new_job->abort_callback = CreateErrorRunCallback(callback);
284   StartJob(new_job);
285 }
286
287 void JobScheduler::GetResourceListInDirectory(
288     const std::string& directory_resource_id,
289     const google_apis::GetResourceListCallback& callback) {
290   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
291   DCHECK(!callback.is_null());
292
293   JobEntry* new_job = CreateNewJob(
294       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
295   new_job->task = base::Bind(
296       &DriveServiceInterface::GetResourceListInDirectory,
297       base::Unretained(drive_service_),
298       directory_resource_id,
299       base::Bind(&JobScheduler::OnGetResourceListJobDone,
300                  weak_ptr_factory_.GetWeakPtr(),
301                  new_job->job_info.job_id,
302                  callback));
303   new_job->abort_callback = CreateErrorRunCallback(callback);
304   StartJob(new_job);
305 }
306
307 void JobScheduler::Search(
308     const std::string& search_query,
309     const google_apis::GetResourceListCallback& callback) {
310   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
311   DCHECK(!callback.is_null());
312
313   JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
314   new_job->task = base::Bind(
315       &DriveServiceInterface::Search,
316       base::Unretained(drive_service_),
317       search_query,
318       base::Bind(&JobScheduler::OnGetResourceListJobDone,
319                  weak_ptr_factory_.GetWeakPtr(),
320                  new_job->job_info.job_id,
321                  callback));
322   new_job->abort_callback = CreateErrorRunCallback(callback);
323   StartJob(new_job);
324 }
325
326 void JobScheduler::GetChangeList(
327     int64 start_changestamp,
328     const google_apis::GetResourceListCallback& callback) {
329   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
330   DCHECK(!callback.is_null());
331
332   JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
333   new_job->task = base::Bind(
334       &DriveServiceInterface::GetChangeList,
335       base::Unretained(drive_service_),
336       start_changestamp,
337       base::Bind(&JobScheduler::OnGetResourceListJobDone,
338                  weak_ptr_factory_.GetWeakPtr(),
339                  new_job->job_info.job_id,
340                  callback));
341   new_job->abort_callback = CreateErrorRunCallback(callback);
342   StartJob(new_job);
343 }
344
345 void JobScheduler::GetRemainingChangeList(
346     const GURL& next_link,
347     const google_apis::GetResourceListCallback& callback) {
348   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
349   DCHECK(!callback.is_null());
350
351   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
352   new_job->task = base::Bind(
353       &DriveServiceInterface::GetRemainingChangeList,
354       base::Unretained(drive_service_),
355       next_link,
356       base::Bind(&JobScheduler::OnGetResourceListJobDone,
357                  weak_ptr_factory_.GetWeakPtr(),
358                  new_job->job_info.job_id,
359                  callback));
360   new_job->abort_callback = CreateErrorRunCallback(callback);
361   StartJob(new_job);
362 }
363
364 void JobScheduler::GetRemainingFileList(
365     const GURL& next_link,
366     const google_apis::GetResourceListCallback& callback) {
367   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
368   DCHECK(!callback.is_null());
369
370   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
371   new_job->task = base::Bind(
372       &DriveServiceInterface::GetRemainingFileList,
373       base::Unretained(drive_service_),
374       next_link,
375       base::Bind(&JobScheduler::OnGetResourceListJobDone,
376                  weak_ptr_factory_.GetWeakPtr(),
377                  new_job->job_info.job_id,
378                  callback));
379   new_job->abort_callback = CreateErrorRunCallback(callback);
380   StartJob(new_job);
381 }
382
383 void JobScheduler::GetResourceEntry(
384     const std::string& resource_id,
385     const ClientContext& context,
386     const google_apis::GetResourceEntryCallback& callback) {
387   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
388   DCHECK(!callback.is_null());
389
390   JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
391   new_job->context = context;
392   new_job->task = base::Bind(
393       &DriveServiceInterface::GetResourceEntry,
394       base::Unretained(drive_service_),
395       resource_id,
396       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
397                  weak_ptr_factory_.GetWeakPtr(),
398                  new_job->job_info.job_id,
399                  callback));
400   new_job->abort_callback = CreateErrorRunCallback(callback);
401   StartJob(new_job);
402 }
403
404 void JobScheduler::GetShareUrl(
405     const std::string& resource_id,
406     const GURL& embed_origin,
407     const ClientContext& context,
408     const google_apis::GetShareUrlCallback& callback) {
409   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
410   DCHECK(!callback.is_null());
411
412   JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
413   new_job->context = context;
414   new_job->task = base::Bind(
415       &DriveServiceInterface::GetShareUrl,
416       base::Unretained(drive_service_),
417       resource_id,
418       embed_origin,
419       base::Bind(&JobScheduler::OnGetShareUrlJobDone,
420                  weak_ptr_factory_.GetWeakPtr(),
421                  new_job->job_info.job_id,
422                  callback));
423   new_job->abort_callback = CreateErrorRunCallback(callback);
424   StartJob(new_job);
425 }
426
427 void JobScheduler::TrashResource(
428     const std::string& resource_id,
429     const ClientContext& context,
430     const google_apis::EntryActionCallback& callback) {
431   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
432   DCHECK(!callback.is_null());
433
434   JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
435   new_job->context = context;
436   new_job->task = base::Bind(
437       &DriveServiceInterface::TrashResource,
438       base::Unretained(drive_service_),
439       resource_id,
440       base::Bind(&JobScheduler::OnEntryActionJobDone,
441                  weak_ptr_factory_.GetWeakPtr(),
442                  new_job->job_info.job_id,
443                  callback));
444   new_job->abort_callback = callback;
445   StartJob(new_job);
446 }
447
448 void JobScheduler::CopyResource(
449     const std::string& resource_id,
450     const std::string& parent_resource_id,
451     const std::string& new_title,
452     const base::Time& last_modified,
453     const google_apis::GetResourceEntryCallback& callback) {
454   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
455   DCHECK(!callback.is_null());
456
457   JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
458   new_job->task = base::Bind(
459       &DriveServiceInterface::CopyResource,
460       base::Unretained(drive_service_),
461       resource_id,
462       parent_resource_id,
463       new_title,
464       last_modified,
465       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
466                  weak_ptr_factory_.GetWeakPtr(),
467                  new_job->job_info.job_id,
468                  callback));
469   new_job->abort_callback = CreateErrorRunCallback(callback);
470   StartJob(new_job);
471 }
472
473 void JobScheduler::UpdateResource(
474     const std::string& resource_id,
475     const std::string& parent_resource_id,
476     const std::string& new_title,
477     const base::Time& last_modified,
478     const base::Time& last_viewed_by_me,
479     const ClientContext& context,
480     const google_apis::GetResourceEntryCallback& callback) {
481   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
482   DCHECK(!callback.is_null());
483
484   JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
485   new_job->context = context;
486   new_job->task = base::Bind(
487       &DriveServiceInterface::UpdateResource,
488       base::Unretained(drive_service_),
489       resource_id,
490       parent_resource_id,
491       new_title,
492       last_modified,
493       last_viewed_by_me,
494       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
495                  weak_ptr_factory_.GetWeakPtr(),
496                  new_job->job_info.job_id,
497                  callback));
498   new_job->abort_callback = CreateErrorRunCallback(callback);
499   StartJob(new_job);
500 }
501
502 void JobScheduler::RenameResource(
503     const std::string& resource_id,
504     const std::string& new_title,
505     const google_apis::EntryActionCallback& callback) {
506   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
507   DCHECK(!callback.is_null());
508
509   JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
510   new_job->task = base::Bind(
511       &DriveServiceInterface::RenameResource,
512       base::Unretained(drive_service_),
513       resource_id,
514       new_title,
515       base::Bind(&JobScheduler::OnEntryActionJobDone,
516                  weak_ptr_factory_.GetWeakPtr(),
517                  new_job->job_info.job_id,
518                  callback));
519   new_job->abort_callback = callback;
520   StartJob(new_job);
521 }
522
523 void JobScheduler::AddResourceToDirectory(
524     const std::string& parent_resource_id,
525     const std::string& resource_id,
526     const google_apis::EntryActionCallback& callback) {
527   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
528   DCHECK(!callback.is_null());
529
530   JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
531   new_job->task = base::Bind(
532       &DriveServiceInterface::AddResourceToDirectory,
533       base::Unretained(drive_service_),
534       parent_resource_id,
535       resource_id,
536       base::Bind(&JobScheduler::OnEntryActionJobDone,
537                  weak_ptr_factory_.GetWeakPtr(),
538                  new_job->job_info.job_id,
539                  callback));
540   new_job->abort_callback = callback;
541   StartJob(new_job);
542 }
543
544 void JobScheduler::RemoveResourceFromDirectory(
545     const std::string& parent_resource_id,
546     const std::string& resource_id,
547     const ClientContext& context,
548     const google_apis::EntryActionCallback& callback) {
549   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
550
551   JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
552   new_job->context = context;
553   new_job->task = base::Bind(
554       &DriveServiceInterface::RemoveResourceFromDirectory,
555       base::Unretained(drive_service_),
556       parent_resource_id,
557       resource_id,
558       base::Bind(&JobScheduler::OnEntryActionJobDone,
559                  weak_ptr_factory_.GetWeakPtr(),
560                  new_job->job_info.job_id,
561                  callback));
562   new_job->abort_callback = callback;
563   StartJob(new_job);
564 }
565
566 void JobScheduler::AddNewDirectory(
567     const std::string& parent_resource_id,
568     const std::string& directory_title,
569     const DriveServiceInterface::AddNewDirectoryOptions& options,
570     const ClientContext& context,
571     const google_apis::GetResourceEntryCallback& callback) {
572   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
573
574   JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
575   new_job->context = context;
576   new_job->task = base::Bind(
577       &DriveServiceInterface::AddNewDirectory,
578       base::Unretained(drive_service_),
579       parent_resource_id,
580       directory_title,
581       options,
582       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
583                  weak_ptr_factory_.GetWeakPtr(),
584                  new_job->job_info.job_id,
585                  callback));
586   new_job->abort_callback = CreateErrorRunCallback(callback);
587   StartJob(new_job);
588 }
589
590 JobID JobScheduler::DownloadFile(
591     const base::FilePath& virtual_path,
592     int64 expected_file_size,
593     const base::FilePath& local_cache_path,
594     const std::string& resource_id,
595     const ClientContext& context,
596     const google_apis::DownloadActionCallback& download_action_callback,
597     const google_apis::GetContentCallback& get_content_callback) {
598   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
599
600   JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
601   new_job->job_info.file_path = virtual_path;
602   new_job->job_info.num_total_bytes = expected_file_size;
603   new_job->context = context;
604   new_job->task = base::Bind(
605       &DriveServiceInterface::DownloadFile,
606       base::Unretained(drive_service_),
607       local_cache_path,
608       resource_id,
609       base::Bind(&JobScheduler::OnDownloadActionJobDone,
610                  weak_ptr_factory_.GetWeakPtr(),
611                  new_job->job_info.job_id,
612                  download_action_callback),
613       get_content_callback,
614       base::Bind(&JobScheduler::UpdateProgress,
615                  weak_ptr_factory_.GetWeakPtr(),
616                  new_job->job_info.job_id));
617   new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
618   StartJob(new_job);
619   return new_job->job_info.job_id;
620 }
621
622 void JobScheduler::UploadNewFile(
623     const std::string& parent_resource_id,
624     const base::FilePath& drive_file_path,
625     const base::FilePath& local_file_path,
626     const std::string& title,
627     const std::string& content_type,
628     const DriveUploader::UploadNewFileOptions& options,
629     const ClientContext& context,
630     const google_apis::GetResourceEntryCallback& callback) {
631   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
632
633   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
634   new_job->job_info.file_path = drive_file_path;
635   new_job->context = context;
636
637   UploadNewFileParams params;
638   params.parent_resource_id = parent_resource_id;
639   params.local_file_path = local_file_path;
640   params.title = title;
641   params.content_type = content_type;
642   params.options = options;
643
644   ResumeUploadParams resume_params;
645   resume_params.local_file_path = params.local_file_path;
646   resume_params.content_type = params.content_type;
647
648   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
649                                weak_ptr_factory_.GetWeakPtr(),
650                                new_job->job_info.job_id,
651                                resume_params,
652                                callback);
653   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
654                                         weak_ptr_factory_.GetWeakPtr(),
655                                         new_job->job_info.job_id);
656   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
657   new_job->abort_callback = CreateErrorRunCallback(callback);
658   StartJob(new_job);
659 }
660
661 void JobScheduler::UploadExistingFile(
662     const std::string& resource_id,
663     const base::FilePath& drive_file_path,
664     const base::FilePath& local_file_path,
665     const std::string& content_type,
666     const DriveUploader::UploadExistingFileOptions& options,
667     const ClientContext& context,
668     const google_apis::GetResourceEntryCallback& callback) {
669   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
670
671   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
672   new_job->job_info.file_path = drive_file_path;
673   new_job->context = context;
674
675   UploadExistingFileParams params;
676   params.resource_id = resource_id;
677   params.local_file_path = local_file_path;
678   params.content_type = content_type;
679   params.options = options;
680
681   ResumeUploadParams resume_params;
682   resume_params.local_file_path = params.local_file_path;
683   resume_params.content_type = params.content_type;
684
685   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
686                                weak_ptr_factory_.GetWeakPtr(),
687                                new_job->job_info.job_id,
688                                resume_params,
689                                callback);
690   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
691                                         weak_ptr_factory_.GetWeakPtr(),
692                                         new_job->job_info.job_id);
693   new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
694   new_job->abort_callback = CreateErrorRunCallback(callback);
695   StartJob(new_job);
696 }
697
698 void JobScheduler::GetResourceListInDirectoryByWapi(
699     const std::string& directory_resource_id,
700     const google_apis::GetResourceListCallback& callback) {
701   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
702   DCHECK(!callback.is_null());
703
704   JobEntry* new_job = CreateNewJob(
705       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
706   new_job->task = base::Bind(
707       &DriveServiceInterface::GetResourceListInDirectoryByWapi,
708       base::Unretained(drive_service_),
709       directory_resource_id,
710       base::Bind(&JobScheduler::OnGetResourceListJobDone,
711                  weak_ptr_factory_.GetWeakPtr(),
712                  new_job->job_info.job_id,
713                  callback));
714   new_job->abort_callback = CreateErrorRunCallback(callback);
715   StartJob(new_job);
716 }
717
718 void JobScheduler::GetRemainingResourceList(
719     const GURL& next_link,
720     const google_apis::GetResourceListCallback& callback) {
721   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
722   DCHECK(!callback.is_null());
723
724   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
725   new_job->task = base::Bind(
726       &DriveServiceInterface::GetRemainingResourceList,
727       base::Unretained(drive_service_),
728       next_link,
729       base::Bind(&JobScheduler::OnGetResourceListJobDone,
730                  weak_ptr_factory_.GetWeakPtr(),
731                  new_job->job_info.job_id,
732                  callback));
733   new_job->abort_callback = CreateErrorRunCallback(callback);
734   StartJob(new_job);
735 }
736
737 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
738   JobEntry* job = new JobEntry(type);
739   job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
740   return job;
741 }
742
743 void JobScheduler::StartJob(JobEntry* job) {
744   DCHECK(!job->task.is_null());
745
746   QueueJob(job->job_info.job_id);
747   NotifyJobAdded(job->job_info);
748   DoJobLoop(GetJobQueueType(job->job_info.job_type));
749 }
750
751 void JobScheduler::QueueJob(JobID job_id) {
752   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
753
754   JobEntry* job_entry = job_map_.Lookup(job_id);
755   DCHECK(job_entry);
756   const JobInfo& job_info = job_entry->job_info;
757
758   QueueType queue_type = GetJobQueueType(job_info.job_type);
759   queue_[queue_type]->Push(job_id, job_entry->context.type);
760
761   const std::string retry_prefix = job_entry->retry_count > 0 ?
762       base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
763   logger_->Log(logging::LOG_INFO,
764                "Job queued%s: %s - %s",
765                retry_prefix.c_str(),
766                job_info.ToString().c_str(),
767                GetQueueInfo(queue_type).c_str());
768 }
769
770 void JobScheduler::DoJobLoop(QueueType queue_type) {
771   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
772
773   const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
774
775   // Abort all USER_INITAITED jobs when not accepted.
776   if (accepted_priority < USER_INITIATED) {
777     std::vector<JobID> jobs;
778     queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
779     for (size_t i = 0; i < jobs.size(); ++i) {
780       JobEntry* job = job_map_.Lookup(jobs[i]);
781       DCHECK(job);
782       AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
783     }
784   }
785
786   // Wait when throttled.
787   const base::Time now = base::Time::Now();
788   if (now < wait_until_) {
789     base::MessageLoopProxy::current()->PostDelayedTask(
790         FROM_HERE,
791         base::Bind(&JobScheduler::DoJobLoop,
792                    weak_ptr_factory_.GetWeakPtr(),
793                    queue_type),
794         wait_until_ - now);
795     return;
796   }
797
798   // Run the job with the highest priority in the queue.
799   JobID job_id = -1;
800   if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
801     return;
802
803   JobEntry* entry = job_map_.Lookup(job_id);
804   DCHECK(entry);
805
806   JobInfo* job_info = &entry->job_info;
807   job_info->state = STATE_RUNNING;
808   job_info->start_time = now;
809   NotifyJobUpdated(*job_info);
810
811   entry->cancel_callback = entry->task.Run();
812
813   UpdateWait();
814
815   logger_->Log(logging::LOG_INFO,
816                "Job started: %s - %s",
817                job_info->ToString().c_str(),
818                GetQueueInfo(queue_type).c_str());
819 }
820
821 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
822   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
823
824   const int kNoJobShouldRun = -1;
825
826   // Should stop if Drive was disabled while running the fetch loop.
827   if (pref_service_->GetBoolean(prefs::kDisableDrive))
828     return kNoJobShouldRun;
829
830   // Should stop if the network is not online.
831   if (net::NetworkChangeNotifier::IsOffline())
832     return kNoJobShouldRun;
833
834   // For the file queue, if it is on cellular network, only user initiated
835   // operations are allowed to start.
836   if (queue_type == FILE_QUEUE &&
837       pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
838       net::NetworkChangeNotifier::IsConnectionCellular(
839           net::NetworkChangeNotifier::GetConnectionType()))
840     return USER_INITIATED;
841
842   // Otherwise, every operations including background tasks are allowed.
843   return BACKGROUND;
844 }
845
846 void JobScheduler::UpdateWait() {
847   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
848
849   if (disable_throttling_ || throttle_count_ == 0)
850     return;
851
852   // Exponential backoff: https://developers.google.com/drive/handle-errors.
853   base::TimeDelta delay =
854       base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
855       base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
856   VLOG(1) << "Throttling for " << delay.InMillisecondsF();
857
858   wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
859 }
860
861 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
862   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
863
864   JobEntry* job_entry = job_map_.Lookup(job_id);
865   DCHECK(job_entry);
866   JobInfo* job_info = &job_entry->job_info;
867   QueueType queue_type = GetJobQueueType(job_info->job_type);
868   queue_[queue_type]->MarkFinished(job_id);
869
870   const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
871   bool success = (GDataToFileError(error) == FILE_ERROR_OK);
872   logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
873                "Job done: %s => %s (elapsed time: %sms) - %s",
874                job_info->ToString().c_str(),
875                GDataErrorCodeToString(error).c_str(),
876                base::Int64ToString(elapsed.InMilliseconds()).c_str(),
877                GetQueueInfo(queue_type).c_str());
878
879   // Retry, depending on the error.
880   const bool is_server_error =
881       error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
882       error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
883   if (is_server_error) {
884     if (throttle_count_ < kMaxThrottleCount)
885       ++throttle_count_;
886     UpdateWait();
887   } else {
888     throttle_count_ = 0;
889   }
890
891   const bool should_retry =
892       is_server_error && job_entry->retry_count < kMaxRetryCount;
893   if (should_retry) {
894     job_entry->cancel_callback.Reset();
895     job_info->state = STATE_RETRY;
896     NotifyJobUpdated(*job_info);
897
898     ++job_entry->retry_count;
899
900     // Requeue the job.
901     QueueJob(job_id);
902   } else {
903     NotifyJobDone(*job_info, error);
904     // The job has finished, no retry will happen in the scheduler. Now we can
905     // remove the job info from the map.
906     job_map_.Remove(job_id);
907   }
908
909   // Post a task to continue the job loop.  This allows us to finish handling
910   // the current job before starting the next one.
911   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
912       base::Bind(&JobScheduler::DoJobLoop,
913                  weak_ptr_factory_.GetWeakPtr(),
914                  queue_type));
915   return !should_retry;
916 }
917
918 void JobScheduler::OnGetResourceListJobDone(
919     JobID job_id,
920     const google_apis::GetResourceListCallback& callback,
921     google_apis::GDataErrorCode error,
922     scoped_ptr<google_apis::ResourceList> resource_list) {
923   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
924   DCHECK(!callback.is_null());
925
926   if (OnJobDone(job_id, error))
927     callback.Run(error, resource_list.Pass());
928 }
929
930 void JobScheduler::OnGetResourceEntryJobDone(
931     JobID job_id,
932     const google_apis::GetResourceEntryCallback& callback,
933     google_apis::GDataErrorCode error,
934     scoped_ptr<google_apis::ResourceEntry> entry) {
935   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
936   DCHECK(!callback.is_null());
937
938   if (OnJobDone(job_id, error))
939     callback.Run(error, entry.Pass());
940 }
941
942 void JobScheduler::OnGetAboutResourceJobDone(
943     JobID job_id,
944     const google_apis::AboutResourceCallback& callback,
945     google_apis::GDataErrorCode error,
946     scoped_ptr<google_apis::AboutResource> about_resource) {
947   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
948   DCHECK(!callback.is_null());
949
950   if (OnJobDone(job_id, error))
951     callback.Run(error, about_resource.Pass());
952 }
953
954 void JobScheduler::OnGetShareUrlJobDone(
955     JobID job_id,
956     const google_apis::GetShareUrlCallback& callback,
957     google_apis::GDataErrorCode error,
958     const GURL& share_url) {
959   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
960   DCHECK(!callback.is_null());
961
962   if (OnJobDone(job_id, error))
963     callback.Run(error, share_url);
964 }
965
966 void JobScheduler::OnGetAppListJobDone(
967     JobID job_id,
968     const google_apis::AppListCallback& callback,
969     google_apis::GDataErrorCode error,
970     scoped_ptr<google_apis::AppList> app_list) {
971   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
972   DCHECK(!callback.is_null());
973
974   if (OnJobDone(job_id, error))
975     callback.Run(error, app_list.Pass());
976 }
977
978 void JobScheduler::OnEntryActionJobDone(
979     JobID job_id,
980     const google_apis::EntryActionCallback& callback,
981     google_apis::GDataErrorCode error) {
982   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
983   DCHECK(!callback.is_null());
984
985   if (OnJobDone(job_id, error))
986     callback.Run(error);
987 }
988
989 void JobScheduler::OnDownloadActionJobDone(
990     JobID job_id,
991     const google_apis::DownloadActionCallback& callback,
992     google_apis::GDataErrorCode error,
993     const base::FilePath& temp_file) {
994   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
995   DCHECK(!callback.is_null());
996
997   if (OnJobDone(job_id, error))
998     callback.Run(error, temp_file);
999 }
1000
1001 void JobScheduler::OnUploadCompletionJobDone(
1002     JobID job_id,
1003     const ResumeUploadParams& resume_params,
1004     const google_apis::GetResourceEntryCallback& callback,
1005     google_apis::GDataErrorCode error,
1006     const GURL& upload_location,
1007     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1008   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1009   DCHECK(!callback.is_null());
1010
1011   if (!upload_location.is_empty()) {
1012     // If upload_location is available, update the task to resume the
1013     // upload process from the terminated point.
1014     // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1015     // so OnJobDone called below will be in charge to re-queue the job.
1016     JobEntry* job_entry = job_map_.Lookup(job_id);
1017     DCHECK(job_entry);
1018
1019     ResumeUploadFileParams params;
1020     params.upload_location = upload_location;
1021     params.local_file_path = resume_params.local_file_path;
1022     params.content_type = resume_params.content_type;
1023     params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1024                                  weak_ptr_factory_.GetWeakPtr(),
1025                                  job_id,
1026                                  job_entry->task,
1027                                  callback);
1028     params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1029                                           weak_ptr_factory_.GetWeakPtr(),
1030                                           job_id);
1031     job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1032   }
1033
1034   if (OnJobDone(job_id, error))
1035     callback.Run(error, resource_entry.Pass());
1036 }
1037
1038 void JobScheduler::OnResumeUploadFileDone(
1039     JobID job_id,
1040     const base::Callback<google_apis::CancelCallback()>& original_task,
1041     const google_apis::GetResourceEntryCallback& callback,
1042     google_apis::GDataErrorCode error,
1043     const GURL& upload_location,
1044     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1045   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1046   DCHECK(!original_task.is_null());
1047   DCHECK(!callback.is_null());
1048
1049   if (upload_location.is_empty()) {
1050     // If upload_location is not available, we should discard it and stop trying
1051     // to resume. Restore the original task.
1052     JobEntry* job_entry = job_map_.Lookup(job_id);
1053     DCHECK(job_entry);
1054     job_entry->task = original_task;
1055   }
1056
1057   if (OnJobDone(job_id, error))
1058     callback.Run(error, resource_entry.Pass());
1059 }
1060
1061 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1062   JobEntry* job_entry = job_map_.Lookup(job_id);
1063   DCHECK(job_entry);
1064
1065   job_entry->job_info.num_completed_bytes = progress;
1066   if (total != -1)
1067     job_entry->job_info.num_total_bytes = total;
1068   NotifyJobUpdated(job_entry->job_info);
1069 }
1070
1071 void JobScheduler::OnConnectionTypeChanged(
1072     net::NetworkChangeNotifier::ConnectionType type) {
1073   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1074
1075   // Resume the job loop.
1076   // Note that we don't need to check the network connection status as it will
1077   // be checked in GetCurrentAcceptedPriority().
1078   for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1079     DoJobLoop(static_cast<QueueType>(i));
1080 }
1081
1082 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1083   switch (type) {
1084     case TYPE_GET_ABOUT_RESOURCE:
1085     case TYPE_GET_APP_LIST:
1086     case TYPE_GET_ALL_RESOURCE_LIST:
1087     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1088     case TYPE_SEARCH:
1089     case TYPE_GET_CHANGE_LIST:
1090     case TYPE_GET_REMAINING_CHANGE_LIST:
1091     case TYPE_GET_REMAINING_FILE_LIST:
1092     case TYPE_GET_RESOURCE_ENTRY:
1093     case TYPE_GET_SHARE_URL:
1094     case TYPE_TRASH_RESOURCE:
1095     case TYPE_COPY_RESOURCE:
1096     case TYPE_UPDATE_RESOURCE:
1097     case TYPE_RENAME_RESOURCE:
1098     case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1099     case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1100     case TYPE_ADD_NEW_DIRECTORY:
1101     case TYPE_CREATE_FILE:
1102     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
1103     case TYPE_GET_REMAINING_RESOURCE_LIST:
1104       return METADATA_QUEUE;
1105
1106     case TYPE_DOWNLOAD_FILE:
1107     case TYPE_UPLOAD_NEW_FILE:
1108     case TYPE_UPLOAD_EXISTING_FILE:
1109       return FILE_QUEUE;
1110   }
1111   NOTREACHED();
1112   return FILE_QUEUE;
1113 }
1114
1115 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1116                                       google_apis::GDataErrorCode error) {
1117   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1118
1119   const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1120   const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1121   logger_->Log(logging::LOG_INFO,
1122                "Job aborted: %s => %s (elapsed time: %sms) - %s",
1123                job->job_info.ToString().c_str(),
1124                GDataErrorCodeToString(error).c_str(),
1125                base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1126                GetQueueInfo(queue_type).c_str());
1127
1128   base::Callback<void(google_apis::GDataErrorCode)> callback =
1129       job->abort_callback;
1130   queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1131   NotifyJobDone(job->job_info, error);
1132   job_map_.Remove(job->job_info.job_id);
1133   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1134                                               base::Bind(callback, error));
1135 }
1136
1137 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1138   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1139   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1140 }
1141
1142 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1143                                  google_apis::GDataErrorCode error) {
1144   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1145   FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1146                     OnJobDone(job_info, GDataToFileError(error)));
1147 }
1148
1149 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1150   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1151   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1152 }
1153
1154 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1155   return QueueTypeToString(type) + " " + queue_[type]->ToString();
1156 }
1157
1158 // static
1159 std::string JobScheduler::QueueTypeToString(QueueType type) {
1160   switch (type) {
1161     case METADATA_QUEUE:
1162       return "METADATA_QUEUE";
1163     case FILE_QUEUE:
1164       return "FILE_QUEUE";
1165     case NUM_QUEUES:
1166       break;  // This value is just a sentinel. Should never be used.
1167   }
1168   NOTREACHED();
1169   return "";
1170 }
1171
1172 }  // namespace drive