1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/drive/event_logger.h"
14 #include "chrome/common/pref_names.h"
15 #include "content/public/browser/browser_thread.h"
16 #include "google_apis/drive/drive_api_parser.h"
18 using content::BrowserThread;
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error. The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount = 4;
33 const int kMaxRetryCount = 2 * kMaxThrottleCount;
35 // GetDefaultValue returns a value constructed by the default constructor.
36 template<typename T> struct DefaultValueCreator {
37 static T GetDefaultValue() { return T(); }
39 template<typename T> struct DefaultValueCreator<const T&> {
40 static T GetDefaultValue() { return T(); }
43 // Helper of CreateErrorRunCallback implementation.
45 // - ResultType; the type of the Callback which should be returned by
46 // CreateErrorRunCallback.
47 // - Run(): a static function which takes the original |callback| and |error|,
48 // and runs the |callback|.Run() with the error code and default values
49 // for remaining arguments.
50 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
52 // CreateErrorRunCallback with two arguments.
54 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
56 const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
57 google_apis::GDataErrorCode error) {
58 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
62 // Returns a callback with the tail parameter bound to its default value.
63 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
64 template<typename CallbackType>
65 base::Callback<void(google_apis::GDataErrorCode)>
66 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
67 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
70 // Parameter struct for RunUploadNewFile.
71 struct UploadNewFileParams {
72 std::string parent_resource_id;
73 base::FilePath local_file_path;
75 std::string content_type;
76 DriveUploader::UploadNewFileOptions options;
77 UploadCompletionCallback callback;
78 google_apis::ProgressCallback progress_callback;
81 // Helper function to work around the arity limitation of base::Bind.
82 google_apis::CancelCallback RunUploadNewFile(
83 DriveUploaderInterface* uploader,
84 const UploadNewFileParams& params) {
85 return uploader->UploadNewFile(params.parent_resource_id,
86 params.local_file_path,
91 params.progress_callback);
94 // Parameter struct for RunUploadExistingFile.
95 struct UploadExistingFileParams {
96 std::string resource_id;
97 base::FilePath local_file_path;
98 std::string content_type;
99 DriveUploader::UploadExistingFileOptions options;
101 UploadCompletionCallback callback;
102 google_apis::ProgressCallback progress_callback;
105 // Helper function to work around the arity limitation of base::Bind.
106 google_apis::CancelCallback RunUploadExistingFile(
107 DriveUploaderInterface* uploader,
108 const UploadExistingFileParams& params) {
109 return uploader->UploadExistingFile(params.resource_id,
110 params.local_file_path,
114 params.progress_callback);
117 // Parameter struct for RunResumeUploadFile.
118 struct ResumeUploadFileParams {
119 GURL upload_location;
120 base::FilePath local_file_path;
121 std::string content_type;
122 UploadCompletionCallback callback;
123 google_apis::ProgressCallback progress_callback;
126 // Helper function to adjust the return type.
127 google_apis::CancelCallback RunResumeUploadFile(
128 DriveUploaderInterface* uploader,
129 const ResumeUploadFileParams& params) {
130 return uploader->ResumeUploadFile(params.upload_location,
131 params.local_file_path,
134 params.progress_callback);
139 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
140 const int JobScheduler::kMaxJobCount[] = {
145 JobScheduler::JobEntry::JobEntry(JobType type)
147 context(ClientContext(USER_INITIATED)),
149 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
152 JobScheduler::JobEntry::~JobEntry() {
153 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
156 struct JobScheduler::ResumeUploadParams {
157 base::FilePath drive_file_path;
158 base::FilePath local_file_path;
159 std::string content_type;
162 JobScheduler::JobScheduler(
163 PrefService* pref_service,
165 DriveServiceInterface* drive_service,
166 base::SequencedTaskRunner* blocking_task_runner)
167 : throttle_count_(0),
168 wait_until_(base::Time::Now()),
169 disable_throttling_(false),
171 drive_service_(drive_service),
172 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
173 pref_service_(pref_service),
174 weak_ptr_factory_(this) {
175 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
177 for (int i = 0; i < NUM_QUEUES; ++i)
178 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
180 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
183 JobScheduler::~JobScheduler() {
184 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
186 size_t num_queued_jobs = 0;
187 for (int i = 0; i < NUM_QUEUES; ++i)
188 num_queued_jobs += queue_[i]->GetNumberOfJobs();
189 DCHECK_EQ(num_queued_jobs, job_map_.size());
191 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
194 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
195 std::vector<JobInfo> job_info_list;
196 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
197 job_info_list.push_back(iter.GetCurrentValue()->job_info);
198 return job_info_list;
201 void JobScheduler::AddObserver(JobListObserver* observer) {
202 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
203 observer_list_.AddObserver(observer);
206 void JobScheduler::RemoveObserver(JobListObserver* observer) {
207 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
208 observer_list_.RemoveObserver(observer);
211 void JobScheduler::CancelJob(JobID job_id) {
212 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
214 JobEntry* job = job_map_.Lookup(job_id);
216 if (job->job_info.state == STATE_RUNNING) {
217 // If the job is running an HTTP request, cancel it via |cancel_callback|
218 // returned from the request, and wait for termination in the normal
219 // callback handler, OnJobDone.
220 if (!job->cancel_callback.is_null())
221 job->cancel_callback.Run();
223 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
228 void JobScheduler::CancelAllJobs() {
229 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
231 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
232 // removable during iteration.
233 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
234 CancelJob(iter.GetCurrentKey());
237 void JobScheduler::GetAboutResource(
238 const google_apis::AboutResourceCallback& callback) {
239 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
240 DCHECK(!callback.is_null());
242 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
243 new_job->task = base::Bind(
244 &DriveServiceInterface::GetAboutResource,
245 base::Unretained(drive_service_),
246 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
247 weak_ptr_factory_.GetWeakPtr(),
248 new_job->job_info.job_id,
250 new_job->abort_callback = CreateErrorRunCallback(callback);
254 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
255 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
256 DCHECK(!callback.is_null());
258 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
259 new_job->task = base::Bind(
260 &DriveServiceInterface::GetAppList,
261 base::Unretained(drive_service_),
262 base::Bind(&JobScheduler::OnGetAppListJobDone,
263 weak_ptr_factory_.GetWeakPtr(),
264 new_job->job_info.job_id,
266 new_job->abort_callback = CreateErrorRunCallback(callback);
270 void JobScheduler::GetAllResourceList(
271 const google_apis::GetResourceListCallback& callback) {
272 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
273 DCHECK(!callback.is_null());
275 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
276 new_job->task = base::Bind(
277 &DriveServiceInterface::GetAllResourceList,
278 base::Unretained(drive_service_),
279 base::Bind(&JobScheduler::OnGetResourceListJobDone,
280 weak_ptr_factory_.GetWeakPtr(),
281 new_job->job_info.job_id,
283 new_job->abort_callback = CreateErrorRunCallback(callback);
287 void JobScheduler::GetResourceListInDirectory(
288 const std::string& directory_resource_id,
289 const google_apis::GetResourceListCallback& callback) {
290 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
291 DCHECK(!callback.is_null());
293 JobEntry* new_job = CreateNewJob(
294 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
295 new_job->task = base::Bind(
296 &DriveServiceInterface::GetResourceListInDirectory,
297 base::Unretained(drive_service_),
298 directory_resource_id,
299 base::Bind(&JobScheduler::OnGetResourceListJobDone,
300 weak_ptr_factory_.GetWeakPtr(),
301 new_job->job_info.job_id,
303 new_job->abort_callback = CreateErrorRunCallback(callback);
307 void JobScheduler::Search(
308 const std::string& search_query,
309 const google_apis::GetResourceListCallback& callback) {
310 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
311 DCHECK(!callback.is_null());
313 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
314 new_job->task = base::Bind(
315 &DriveServiceInterface::Search,
316 base::Unretained(drive_service_),
318 base::Bind(&JobScheduler::OnGetResourceListJobDone,
319 weak_ptr_factory_.GetWeakPtr(),
320 new_job->job_info.job_id,
322 new_job->abort_callback = CreateErrorRunCallback(callback);
326 void JobScheduler::GetChangeList(
327 int64 start_changestamp,
328 const google_apis::GetResourceListCallback& callback) {
329 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
330 DCHECK(!callback.is_null());
332 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
333 new_job->task = base::Bind(
334 &DriveServiceInterface::GetChangeList,
335 base::Unretained(drive_service_),
337 base::Bind(&JobScheduler::OnGetResourceListJobDone,
338 weak_ptr_factory_.GetWeakPtr(),
339 new_job->job_info.job_id,
341 new_job->abort_callback = CreateErrorRunCallback(callback);
345 void JobScheduler::GetRemainingChangeList(
346 const GURL& next_link,
347 const google_apis::GetResourceListCallback& callback) {
348 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
349 DCHECK(!callback.is_null());
351 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
352 new_job->task = base::Bind(
353 &DriveServiceInterface::GetRemainingChangeList,
354 base::Unretained(drive_service_),
356 base::Bind(&JobScheduler::OnGetResourceListJobDone,
357 weak_ptr_factory_.GetWeakPtr(),
358 new_job->job_info.job_id,
360 new_job->abort_callback = CreateErrorRunCallback(callback);
364 void JobScheduler::GetRemainingFileList(
365 const GURL& next_link,
366 const google_apis::GetResourceListCallback& callback) {
367 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
368 DCHECK(!callback.is_null());
370 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
371 new_job->task = base::Bind(
372 &DriveServiceInterface::GetRemainingFileList,
373 base::Unretained(drive_service_),
375 base::Bind(&JobScheduler::OnGetResourceListJobDone,
376 weak_ptr_factory_.GetWeakPtr(),
377 new_job->job_info.job_id,
379 new_job->abort_callback = CreateErrorRunCallback(callback);
383 void JobScheduler::GetResourceEntry(
384 const std::string& resource_id,
385 const ClientContext& context,
386 const google_apis::GetResourceEntryCallback& callback) {
387 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
388 DCHECK(!callback.is_null());
390 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
391 new_job->context = context;
392 new_job->task = base::Bind(
393 &DriveServiceInterface::GetResourceEntry,
394 base::Unretained(drive_service_),
396 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
397 weak_ptr_factory_.GetWeakPtr(),
398 new_job->job_info.job_id,
400 new_job->abort_callback = CreateErrorRunCallback(callback);
404 void JobScheduler::GetShareUrl(
405 const std::string& resource_id,
406 const GURL& embed_origin,
407 const ClientContext& context,
408 const google_apis::GetShareUrlCallback& callback) {
409 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
410 DCHECK(!callback.is_null());
412 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
413 new_job->context = context;
414 new_job->task = base::Bind(
415 &DriveServiceInterface::GetShareUrl,
416 base::Unretained(drive_service_),
419 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
420 weak_ptr_factory_.GetWeakPtr(),
421 new_job->job_info.job_id,
423 new_job->abort_callback = CreateErrorRunCallback(callback);
427 void JobScheduler::TrashResource(
428 const std::string& resource_id,
429 const ClientContext& context,
430 const google_apis::EntryActionCallback& callback) {
431 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
432 DCHECK(!callback.is_null());
434 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
435 new_job->context = context;
436 new_job->task = base::Bind(
437 &DriveServiceInterface::TrashResource,
438 base::Unretained(drive_service_),
440 base::Bind(&JobScheduler::OnEntryActionJobDone,
441 weak_ptr_factory_.GetWeakPtr(),
442 new_job->job_info.job_id,
444 new_job->abort_callback = callback;
448 void JobScheduler::CopyResource(
449 const std::string& resource_id,
450 const std::string& parent_resource_id,
451 const std::string& new_title,
452 const base::Time& last_modified,
453 const google_apis::GetResourceEntryCallback& callback) {
454 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
455 DCHECK(!callback.is_null());
457 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
458 new_job->task = base::Bind(
459 &DriveServiceInterface::CopyResource,
460 base::Unretained(drive_service_),
465 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
466 weak_ptr_factory_.GetWeakPtr(),
467 new_job->job_info.job_id,
469 new_job->abort_callback = CreateErrorRunCallback(callback);
473 void JobScheduler::UpdateResource(
474 const std::string& resource_id,
475 const std::string& parent_resource_id,
476 const std::string& new_title,
477 const base::Time& last_modified,
478 const base::Time& last_viewed_by_me,
479 const ClientContext& context,
480 const google_apis::GetResourceEntryCallback& callback) {
481 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
482 DCHECK(!callback.is_null());
484 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
485 new_job->context = context;
486 new_job->task = base::Bind(
487 &DriveServiceInterface::UpdateResource,
488 base::Unretained(drive_service_),
494 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
495 weak_ptr_factory_.GetWeakPtr(),
496 new_job->job_info.job_id,
498 new_job->abort_callback = CreateErrorRunCallback(callback);
502 void JobScheduler::RenameResource(
503 const std::string& resource_id,
504 const std::string& new_title,
505 const google_apis::EntryActionCallback& callback) {
506 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
507 DCHECK(!callback.is_null());
509 JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
510 new_job->task = base::Bind(
511 &DriveServiceInterface::RenameResource,
512 base::Unretained(drive_service_),
515 base::Bind(&JobScheduler::OnEntryActionJobDone,
516 weak_ptr_factory_.GetWeakPtr(),
517 new_job->job_info.job_id,
519 new_job->abort_callback = callback;
523 void JobScheduler::AddResourceToDirectory(
524 const std::string& parent_resource_id,
525 const std::string& resource_id,
526 const google_apis::EntryActionCallback& callback) {
527 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
528 DCHECK(!callback.is_null());
530 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
531 new_job->task = base::Bind(
532 &DriveServiceInterface::AddResourceToDirectory,
533 base::Unretained(drive_service_),
536 base::Bind(&JobScheduler::OnEntryActionJobDone,
537 weak_ptr_factory_.GetWeakPtr(),
538 new_job->job_info.job_id,
540 new_job->abort_callback = callback;
544 void JobScheduler::RemoveResourceFromDirectory(
545 const std::string& parent_resource_id,
546 const std::string& resource_id,
547 const ClientContext& context,
548 const google_apis::EntryActionCallback& callback) {
549 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
551 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
552 new_job->context = context;
553 new_job->task = base::Bind(
554 &DriveServiceInterface::RemoveResourceFromDirectory,
555 base::Unretained(drive_service_),
558 base::Bind(&JobScheduler::OnEntryActionJobDone,
559 weak_ptr_factory_.GetWeakPtr(),
560 new_job->job_info.job_id,
562 new_job->abort_callback = callback;
566 void JobScheduler::AddNewDirectory(
567 const std::string& parent_resource_id,
568 const std::string& directory_title,
569 const DriveServiceInterface::AddNewDirectoryOptions& options,
570 const ClientContext& context,
571 const google_apis::GetResourceEntryCallback& callback) {
572 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
574 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
575 new_job->context = context;
576 new_job->task = base::Bind(
577 &DriveServiceInterface::AddNewDirectory,
578 base::Unretained(drive_service_),
582 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
583 weak_ptr_factory_.GetWeakPtr(),
584 new_job->job_info.job_id,
586 new_job->abort_callback = CreateErrorRunCallback(callback);
590 JobID JobScheduler::DownloadFile(
591 const base::FilePath& virtual_path,
592 int64 expected_file_size,
593 const base::FilePath& local_cache_path,
594 const std::string& resource_id,
595 const ClientContext& context,
596 const google_apis::DownloadActionCallback& download_action_callback,
597 const google_apis::GetContentCallback& get_content_callback) {
598 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
600 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
601 new_job->job_info.file_path = virtual_path;
602 new_job->job_info.num_total_bytes = expected_file_size;
603 new_job->context = context;
604 new_job->task = base::Bind(
605 &DriveServiceInterface::DownloadFile,
606 base::Unretained(drive_service_),
609 base::Bind(&JobScheduler::OnDownloadActionJobDone,
610 weak_ptr_factory_.GetWeakPtr(),
611 new_job->job_info.job_id,
612 download_action_callback),
613 get_content_callback,
614 base::Bind(&JobScheduler::UpdateProgress,
615 weak_ptr_factory_.GetWeakPtr(),
616 new_job->job_info.job_id));
617 new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
619 return new_job->job_info.job_id;
622 void JobScheduler::UploadNewFile(
623 const std::string& parent_resource_id,
624 const base::FilePath& drive_file_path,
625 const base::FilePath& local_file_path,
626 const std::string& title,
627 const std::string& content_type,
628 const DriveUploader::UploadNewFileOptions& options,
629 const ClientContext& context,
630 const google_apis::GetResourceEntryCallback& callback) {
631 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
633 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
634 new_job->job_info.file_path = drive_file_path;
635 new_job->context = context;
637 UploadNewFileParams params;
638 params.parent_resource_id = parent_resource_id;
639 params.local_file_path = local_file_path;
640 params.title = title;
641 params.content_type = content_type;
642 params.options = options;
644 ResumeUploadParams resume_params;
645 resume_params.local_file_path = params.local_file_path;
646 resume_params.content_type = params.content_type;
648 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
649 weak_ptr_factory_.GetWeakPtr(),
650 new_job->job_info.job_id,
653 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
654 weak_ptr_factory_.GetWeakPtr(),
655 new_job->job_info.job_id);
656 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
657 new_job->abort_callback = CreateErrorRunCallback(callback);
661 void JobScheduler::UploadExistingFile(
662 const std::string& resource_id,
663 const base::FilePath& drive_file_path,
664 const base::FilePath& local_file_path,
665 const std::string& content_type,
666 const DriveUploader::UploadExistingFileOptions& options,
667 const ClientContext& context,
668 const google_apis::GetResourceEntryCallback& callback) {
669 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
671 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
672 new_job->job_info.file_path = drive_file_path;
673 new_job->context = context;
675 UploadExistingFileParams params;
676 params.resource_id = resource_id;
677 params.local_file_path = local_file_path;
678 params.content_type = content_type;
679 params.options = options;
681 ResumeUploadParams resume_params;
682 resume_params.local_file_path = params.local_file_path;
683 resume_params.content_type = params.content_type;
685 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
686 weak_ptr_factory_.GetWeakPtr(),
687 new_job->job_info.job_id,
690 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
691 weak_ptr_factory_.GetWeakPtr(),
692 new_job->job_info.job_id);
693 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
694 new_job->abort_callback = CreateErrorRunCallback(callback);
698 void JobScheduler::GetResourceListInDirectoryByWapi(
699 const std::string& directory_resource_id,
700 const google_apis::GetResourceListCallback& callback) {
701 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
702 DCHECK(!callback.is_null());
704 JobEntry* new_job = CreateNewJob(
705 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
706 new_job->task = base::Bind(
707 &DriveServiceInterface::GetResourceListInDirectoryByWapi,
708 base::Unretained(drive_service_),
709 directory_resource_id,
710 base::Bind(&JobScheduler::OnGetResourceListJobDone,
711 weak_ptr_factory_.GetWeakPtr(),
712 new_job->job_info.job_id,
714 new_job->abort_callback = CreateErrorRunCallback(callback);
718 void JobScheduler::GetRemainingResourceList(
719 const GURL& next_link,
720 const google_apis::GetResourceListCallback& callback) {
721 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
722 DCHECK(!callback.is_null());
724 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
725 new_job->task = base::Bind(
726 &DriveServiceInterface::GetRemainingResourceList,
727 base::Unretained(drive_service_),
729 base::Bind(&JobScheduler::OnGetResourceListJobDone,
730 weak_ptr_factory_.GetWeakPtr(),
731 new_job->job_info.job_id,
733 new_job->abort_callback = CreateErrorRunCallback(callback);
737 void JobScheduler::AddPermission(
738 const std::string& resource_id,
739 const std::string& email,
740 google_apis::drive::PermissionRole role,
741 const google_apis::EntryActionCallback& callback) {
742 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
743 DCHECK(!callback.is_null());
745 JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION);
746 new_job->task = base::Bind(&DriveServiceInterface::AddPermission,
747 base::Unretained(drive_service_),
751 base::Bind(&JobScheduler::OnEntryActionJobDone,
752 weak_ptr_factory_.GetWeakPtr(),
753 new_job->job_info.job_id,
755 new_job->abort_callback = callback;
759 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
760 JobEntry* job = new JobEntry(type);
761 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
765 void JobScheduler::StartJob(JobEntry* job) {
766 DCHECK(!job->task.is_null());
768 QueueJob(job->job_info.job_id);
769 NotifyJobAdded(job->job_info);
770 DoJobLoop(GetJobQueueType(job->job_info.job_type));
773 void JobScheduler::QueueJob(JobID job_id) {
774 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
776 JobEntry* job_entry = job_map_.Lookup(job_id);
778 const JobInfo& job_info = job_entry->job_info;
780 QueueType queue_type = GetJobQueueType(job_info.job_type);
781 queue_[queue_type]->Push(job_id, job_entry->context.type);
783 const std::string retry_prefix = job_entry->retry_count > 0 ?
784 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
785 logger_->Log(logging::LOG_INFO,
786 "Job queued%s: %s - %s",
787 retry_prefix.c_str(),
788 job_info.ToString().c_str(),
789 GetQueueInfo(queue_type).c_str());
792 void JobScheduler::DoJobLoop(QueueType queue_type) {
793 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
795 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
797 // Abort all USER_INITAITED jobs when not accepted.
798 if (accepted_priority < USER_INITIATED) {
799 std::vector<JobID> jobs;
800 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
801 for (size_t i = 0; i < jobs.size(); ++i) {
802 JobEntry* job = job_map_.Lookup(jobs[i]);
804 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
808 // Wait when throttled.
809 const base::Time now = base::Time::Now();
810 if (now < wait_until_) {
811 base::MessageLoopProxy::current()->PostDelayedTask(
813 base::Bind(&JobScheduler::DoJobLoop,
814 weak_ptr_factory_.GetWeakPtr(),
820 // Run the job with the highest priority in the queue.
822 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
825 JobEntry* entry = job_map_.Lookup(job_id);
828 JobInfo* job_info = &entry->job_info;
829 job_info->state = STATE_RUNNING;
830 job_info->start_time = now;
831 NotifyJobUpdated(*job_info);
833 entry->cancel_callback = entry->task.Run();
837 logger_->Log(logging::LOG_INFO,
838 "Job started: %s - %s",
839 job_info->ToString().c_str(),
840 GetQueueInfo(queue_type).c_str());
843 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
844 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
846 const int kNoJobShouldRun = -1;
848 // Should stop if Drive was disabled while running the fetch loop.
849 if (pref_service_->GetBoolean(prefs::kDisableDrive))
850 return kNoJobShouldRun;
852 // Should stop if the network is not online.
853 if (net::NetworkChangeNotifier::IsOffline())
854 return kNoJobShouldRun;
856 // For the file queue, if it is on cellular network, only user initiated
857 // operations are allowed to start.
858 if (queue_type == FILE_QUEUE &&
859 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
860 net::NetworkChangeNotifier::IsConnectionCellular(
861 net::NetworkChangeNotifier::GetConnectionType()))
862 return USER_INITIATED;
864 // Otherwise, every operations including background tasks are allowed.
868 void JobScheduler::UpdateWait() {
869 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
871 if (disable_throttling_ || throttle_count_ == 0)
874 // Exponential backoff: https://developers.google.com/drive/handle-errors.
875 base::TimeDelta delay =
876 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
877 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
878 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
880 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
883 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
884 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
886 JobEntry* job_entry = job_map_.Lookup(job_id);
888 JobInfo* job_info = &job_entry->job_info;
889 QueueType queue_type = GetJobQueueType(job_info->job_type);
890 queue_[queue_type]->MarkFinished(job_id);
892 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
893 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
894 logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
895 "Job done: %s => %s (elapsed time: %sms) - %s",
896 job_info->ToString().c_str(),
897 GDataErrorCodeToString(error).c_str(),
898 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
899 GetQueueInfo(queue_type).c_str());
901 // Retry, depending on the error.
902 const bool is_server_error =
903 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
904 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
905 if (is_server_error) {
906 if (throttle_count_ < kMaxThrottleCount)
913 const bool should_retry =
914 is_server_error && job_entry->retry_count < kMaxRetryCount;
916 job_entry->cancel_callback.Reset();
917 job_info->state = STATE_RETRY;
918 NotifyJobUpdated(*job_info);
920 ++job_entry->retry_count;
925 NotifyJobDone(*job_info, error);
926 // The job has finished, no retry will happen in the scheduler. Now we can
927 // remove the job info from the map.
928 job_map_.Remove(job_id);
931 // Post a task to continue the job loop. This allows us to finish handling
932 // the current job before starting the next one.
933 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
934 base::Bind(&JobScheduler::DoJobLoop,
935 weak_ptr_factory_.GetWeakPtr(),
937 return !should_retry;
940 void JobScheduler::OnGetResourceListJobDone(
942 const google_apis::GetResourceListCallback& callback,
943 google_apis::GDataErrorCode error,
944 scoped_ptr<google_apis::ResourceList> resource_list) {
945 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
946 DCHECK(!callback.is_null());
948 if (OnJobDone(job_id, error))
949 callback.Run(error, resource_list.Pass());
952 void JobScheduler::OnGetResourceEntryJobDone(
954 const google_apis::GetResourceEntryCallback& callback,
955 google_apis::GDataErrorCode error,
956 scoped_ptr<google_apis::ResourceEntry> entry) {
957 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
958 DCHECK(!callback.is_null());
960 if (OnJobDone(job_id, error))
961 callback.Run(error, entry.Pass());
964 void JobScheduler::OnGetAboutResourceJobDone(
966 const google_apis::AboutResourceCallback& callback,
967 google_apis::GDataErrorCode error,
968 scoped_ptr<google_apis::AboutResource> about_resource) {
969 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
970 DCHECK(!callback.is_null());
972 if (OnJobDone(job_id, error))
973 callback.Run(error, about_resource.Pass());
976 void JobScheduler::OnGetShareUrlJobDone(
978 const google_apis::GetShareUrlCallback& callback,
979 google_apis::GDataErrorCode error,
980 const GURL& share_url) {
981 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
982 DCHECK(!callback.is_null());
984 if (OnJobDone(job_id, error))
985 callback.Run(error, share_url);
988 void JobScheduler::OnGetAppListJobDone(
990 const google_apis::AppListCallback& callback,
991 google_apis::GDataErrorCode error,
992 scoped_ptr<google_apis::AppList> app_list) {
993 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
994 DCHECK(!callback.is_null());
996 if (OnJobDone(job_id, error))
997 callback.Run(error, app_list.Pass());
1000 void JobScheduler::OnEntryActionJobDone(
1002 const google_apis::EntryActionCallback& callback,
1003 google_apis::GDataErrorCode error) {
1004 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1005 DCHECK(!callback.is_null());
1007 if (OnJobDone(job_id, error))
1008 callback.Run(error);
1011 void JobScheduler::OnDownloadActionJobDone(
1013 const google_apis::DownloadActionCallback& callback,
1014 google_apis::GDataErrorCode error,
1015 const base::FilePath& temp_file) {
1016 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1017 DCHECK(!callback.is_null());
1019 if (OnJobDone(job_id, error))
1020 callback.Run(error, temp_file);
1023 void JobScheduler::OnUploadCompletionJobDone(
1025 const ResumeUploadParams& resume_params,
1026 const google_apis::GetResourceEntryCallback& callback,
1027 google_apis::GDataErrorCode error,
1028 const GURL& upload_location,
1029 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1030 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1031 DCHECK(!callback.is_null());
1033 if (!upload_location.is_empty()) {
1034 // If upload_location is available, update the task to resume the
1035 // upload process from the terminated point.
1036 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1037 // so OnJobDone called below will be in charge to re-queue the job.
1038 JobEntry* job_entry = job_map_.Lookup(job_id);
1041 ResumeUploadFileParams params;
1042 params.upload_location = upload_location;
1043 params.local_file_path = resume_params.local_file_path;
1044 params.content_type = resume_params.content_type;
1045 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1046 weak_ptr_factory_.GetWeakPtr(),
1050 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1051 weak_ptr_factory_.GetWeakPtr(),
1053 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1056 if (OnJobDone(job_id, error))
1057 callback.Run(error, resource_entry.Pass());
1060 void JobScheduler::OnResumeUploadFileDone(
1062 const base::Callback<google_apis::CancelCallback()>& original_task,
1063 const google_apis::GetResourceEntryCallback& callback,
1064 google_apis::GDataErrorCode error,
1065 const GURL& upload_location,
1066 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1067 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1068 DCHECK(!original_task.is_null());
1069 DCHECK(!callback.is_null());
1071 if (upload_location.is_empty()) {
1072 // If upload_location is not available, we should discard it and stop trying
1073 // to resume. Restore the original task.
1074 JobEntry* job_entry = job_map_.Lookup(job_id);
1076 job_entry->task = original_task;
1079 if (OnJobDone(job_id, error))
1080 callback.Run(error, resource_entry.Pass());
1083 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1084 JobEntry* job_entry = job_map_.Lookup(job_id);
1087 job_entry->job_info.num_completed_bytes = progress;
1089 job_entry->job_info.num_total_bytes = total;
1090 NotifyJobUpdated(job_entry->job_info);
1093 void JobScheduler::OnConnectionTypeChanged(
1094 net::NetworkChangeNotifier::ConnectionType type) {
1095 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1097 // Resume the job loop.
1098 // Note that we don't need to check the network connection status as it will
1099 // be checked in GetCurrentAcceptedPriority().
1100 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1101 DoJobLoop(static_cast<QueueType>(i));
1104 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1106 case TYPE_GET_ABOUT_RESOURCE:
1107 case TYPE_GET_APP_LIST:
1108 case TYPE_GET_ALL_RESOURCE_LIST:
1109 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1111 case TYPE_GET_CHANGE_LIST:
1112 case TYPE_GET_REMAINING_CHANGE_LIST:
1113 case TYPE_GET_REMAINING_FILE_LIST:
1114 case TYPE_GET_RESOURCE_ENTRY:
1115 case TYPE_GET_SHARE_URL:
1116 case TYPE_TRASH_RESOURCE:
1117 case TYPE_COPY_RESOURCE:
1118 case TYPE_UPDATE_RESOURCE:
1119 case TYPE_RENAME_RESOURCE:
1120 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1121 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1122 case TYPE_ADD_NEW_DIRECTORY:
1123 case TYPE_CREATE_FILE:
1124 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
1125 case TYPE_GET_REMAINING_RESOURCE_LIST:
1126 case TYPE_ADD_PERMISSION:
1127 return METADATA_QUEUE;
1129 case TYPE_DOWNLOAD_FILE:
1130 case TYPE_UPLOAD_NEW_FILE:
1131 case TYPE_UPLOAD_EXISTING_FILE:
1138 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1139 google_apis::GDataErrorCode error) {
1140 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1142 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1143 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1144 logger_->Log(logging::LOG_INFO,
1145 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1146 job->job_info.ToString().c_str(),
1147 GDataErrorCodeToString(error).c_str(),
1148 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1149 GetQueueInfo(queue_type).c_str());
1151 base::Callback<void(google_apis::GDataErrorCode)> callback =
1152 job->abort_callback;
1153 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1154 NotifyJobDone(job->job_info, error);
1155 job_map_.Remove(job->job_info.job_id);
1156 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1157 base::Bind(callback, error));
1160 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1161 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1162 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1165 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1166 google_apis::GDataErrorCode error) {
1167 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1168 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1169 OnJobDone(job_info, GDataToFileError(error)));
1172 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1173 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1174 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1177 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1178 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1182 std::string JobScheduler::QueueTypeToString(QueueType type) {
1184 case METADATA_QUEUE:
1185 return "METADATA_QUEUE";
1187 return "FILE_QUEUE";
1189 break; // This value is just a sentinel. Should never be used.
1195 } // namespace drive