1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/drive/event_logger.h"
14 #include "chrome/common/pref_names.h"
15 #include "content/public/browser/browser_thread.h"
16 #include "google_apis/drive/drive_api_parser.h"
18 using content::BrowserThread;
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error. The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount = 4;
33 const int kMaxRetryCount = 2 * kMaxThrottleCount;
35 // GetDefaultValue returns a value constructed by the default constructor.
36 template<typename T> struct DefaultValueCreator {
37 static T GetDefaultValue() { return T(); }
39 template<typename T> struct DefaultValueCreator<const T&> {
40 static T GetDefaultValue() { return T(); }
43 // Helper of CreateErrorRunCallback implementation.
45 // - ResultType; the type of the Callback which should be returned by
46 // CreateErrorRunCallback.
47 // - Run(): a static function which takes the original |callback| and |error|,
48 // and runs the |callback|.Run() with the error code and default values
49 // for remaining arguments.
50 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
52 // CreateErrorRunCallback with two arguments.
54 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
56 const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
57 google_apis::GDataErrorCode error) {
58 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
62 // Returns a callback with the tail parameter bound to its default value.
63 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
64 template<typename CallbackType>
65 base::Callback<void(google_apis::GDataErrorCode)>
66 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
67 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
70 // Parameter struct for RunUploadNewFile.
71 struct UploadNewFileParams {
72 std::string parent_resource_id;
73 base::FilePath local_file_path;
75 std::string content_type;
76 DriveUploader::UploadNewFileOptions options;
77 UploadCompletionCallback callback;
78 google_apis::ProgressCallback progress_callback;
81 // Helper function to work around the arity limitation of base::Bind.
82 google_apis::CancelCallback RunUploadNewFile(
83 DriveUploaderInterface* uploader,
84 const UploadNewFileParams& params) {
85 return uploader->UploadNewFile(params.parent_resource_id,
86 params.local_file_path,
91 params.progress_callback);
94 // Parameter struct for RunUploadExistingFile.
95 struct UploadExistingFileParams {
96 std::string resource_id;
97 base::FilePath local_file_path;
98 std::string content_type;
99 DriveUploader::UploadExistingFileOptions options;
101 UploadCompletionCallback callback;
102 google_apis::ProgressCallback progress_callback;
105 // Helper function to work around the arity limitation of base::Bind.
106 google_apis::CancelCallback RunUploadExistingFile(
107 DriveUploaderInterface* uploader,
108 const UploadExistingFileParams& params) {
109 return uploader->UploadExistingFile(params.resource_id,
110 params.local_file_path,
114 params.progress_callback);
117 // Parameter struct for RunResumeUploadFile.
118 struct ResumeUploadFileParams {
119 GURL upload_location;
120 base::FilePath local_file_path;
121 std::string content_type;
122 UploadCompletionCallback callback;
123 google_apis::ProgressCallback progress_callback;
126 // Helper function to adjust the return type.
127 google_apis::CancelCallback RunResumeUploadFile(
128 DriveUploaderInterface* uploader,
129 const ResumeUploadFileParams& params) {
130 return uploader->ResumeUploadFile(params.upload_location,
131 params.local_file_path,
134 params.progress_callback);
139 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
140 const int JobScheduler::kMaxJobCount[] = {
145 JobScheduler::JobEntry::JobEntry(JobType type)
147 context(ClientContext(USER_INITIATED)),
149 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
152 JobScheduler::JobEntry::~JobEntry() {
153 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
156 struct JobScheduler::ResumeUploadParams {
157 base::FilePath drive_file_path;
158 base::FilePath local_file_path;
159 std::string content_type;
162 JobScheduler::JobScheduler(
163 PrefService* pref_service,
165 DriveServiceInterface* drive_service,
166 base::SequencedTaskRunner* blocking_task_runner)
167 : throttle_count_(0),
168 wait_until_(base::Time::Now()),
169 disable_throttling_(false),
171 drive_service_(drive_service),
172 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
173 pref_service_(pref_service),
174 weak_ptr_factory_(this) {
175 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
177 for (int i = 0; i < NUM_QUEUES; ++i)
178 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
180 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
183 JobScheduler::~JobScheduler() {
184 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
186 size_t num_queued_jobs = 0;
187 for (int i = 0; i < NUM_QUEUES; ++i)
188 num_queued_jobs += queue_[i]->GetNumberOfJobs();
189 DCHECK_EQ(num_queued_jobs, job_map_.size());
191 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
194 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
195 std::vector<JobInfo> job_info_list;
196 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
197 job_info_list.push_back(iter.GetCurrentValue()->job_info);
198 return job_info_list;
201 void JobScheduler::AddObserver(JobListObserver* observer) {
202 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
203 observer_list_.AddObserver(observer);
206 void JobScheduler::RemoveObserver(JobListObserver* observer) {
207 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
208 observer_list_.RemoveObserver(observer);
211 void JobScheduler::CancelJob(JobID job_id) {
212 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
214 JobEntry* job = job_map_.Lookup(job_id);
216 if (job->job_info.state == STATE_RUNNING) {
217 // If the job is running an HTTP request, cancel it via |cancel_callback|
218 // returned from the request, and wait for termination in the normal
219 // callback handler, OnJobDone.
220 if (!job->cancel_callback.is_null())
221 job->cancel_callback.Run();
223 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
228 void JobScheduler::CancelAllJobs() {
229 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
231 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
232 // removable during iteration.
233 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
234 CancelJob(iter.GetCurrentKey());
237 void JobScheduler::GetAboutResource(
238 const google_apis::AboutResourceCallback& callback) {
239 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
240 DCHECK(!callback.is_null());
242 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
243 new_job->task = base::Bind(
244 &DriveServiceInterface::GetAboutResource,
245 base::Unretained(drive_service_),
246 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
247 weak_ptr_factory_.GetWeakPtr(),
248 new_job->job_info.job_id,
250 new_job->abort_callback = CreateErrorRunCallback(callback);
254 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
255 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
256 DCHECK(!callback.is_null());
258 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
259 new_job->task = base::Bind(
260 &DriveServiceInterface::GetAppList,
261 base::Unretained(drive_service_),
262 base::Bind(&JobScheduler::OnGetAppListJobDone,
263 weak_ptr_factory_.GetWeakPtr(),
264 new_job->job_info.job_id,
266 new_job->abort_callback = CreateErrorRunCallback(callback);
270 void JobScheduler::GetAllResourceList(
271 const google_apis::GetResourceListCallback& callback) {
272 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
273 DCHECK(!callback.is_null());
275 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
276 new_job->task = base::Bind(
277 &DriveServiceInterface::GetAllResourceList,
278 base::Unretained(drive_service_),
279 base::Bind(&JobScheduler::OnGetResourceListJobDone,
280 weak_ptr_factory_.GetWeakPtr(),
281 new_job->job_info.job_id,
283 new_job->abort_callback = CreateErrorRunCallback(callback);
287 void JobScheduler::GetResourceListInDirectory(
288 const std::string& directory_resource_id,
289 const google_apis::GetResourceListCallback& callback) {
290 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
291 DCHECK(!callback.is_null());
293 JobEntry* new_job = CreateNewJob(
294 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
295 new_job->task = base::Bind(
296 &DriveServiceInterface::GetResourceListInDirectory,
297 base::Unretained(drive_service_),
298 directory_resource_id,
299 base::Bind(&JobScheduler::OnGetResourceListJobDone,
300 weak_ptr_factory_.GetWeakPtr(),
301 new_job->job_info.job_id,
303 new_job->abort_callback = CreateErrorRunCallback(callback);
307 void JobScheduler::Search(
308 const std::string& search_query,
309 const google_apis::GetResourceListCallback& callback) {
310 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
311 DCHECK(!callback.is_null());
313 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
314 new_job->task = base::Bind(
315 &DriveServiceInterface::Search,
316 base::Unretained(drive_service_),
318 base::Bind(&JobScheduler::OnGetResourceListJobDone,
319 weak_ptr_factory_.GetWeakPtr(),
320 new_job->job_info.job_id,
322 new_job->abort_callback = CreateErrorRunCallback(callback);
326 void JobScheduler::GetChangeList(
327 int64 start_changestamp,
328 const google_apis::GetResourceListCallback& callback) {
329 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
330 DCHECK(!callback.is_null());
332 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
333 new_job->task = base::Bind(
334 &DriveServiceInterface::GetChangeList,
335 base::Unretained(drive_service_),
337 base::Bind(&JobScheduler::OnGetResourceListJobDone,
338 weak_ptr_factory_.GetWeakPtr(),
339 new_job->job_info.job_id,
341 new_job->abort_callback = CreateErrorRunCallback(callback);
345 void JobScheduler::GetRemainingChangeList(
346 const GURL& next_link,
347 const google_apis::GetResourceListCallback& callback) {
348 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
349 DCHECK(!callback.is_null());
351 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
352 new_job->task = base::Bind(
353 &DriveServiceInterface::GetRemainingChangeList,
354 base::Unretained(drive_service_),
356 base::Bind(&JobScheduler::OnGetResourceListJobDone,
357 weak_ptr_factory_.GetWeakPtr(),
358 new_job->job_info.job_id,
360 new_job->abort_callback = CreateErrorRunCallback(callback);
364 void JobScheduler::GetRemainingFileList(
365 const GURL& next_link,
366 const google_apis::GetResourceListCallback& callback) {
367 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
368 DCHECK(!callback.is_null());
370 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
371 new_job->task = base::Bind(
372 &DriveServiceInterface::GetRemainingFileList,
373 base::Unretained(drive_service_),
375 base::Bind(&JobScheduler::OnGetResourceListJobDone,
376 weak_ptr_factory_.GetWeakPtr(),
377 new_job->job_info.job_id,
379 new_job->abort_callback = CreateErrorRunCallback(callback);
383 void JobScheduler::GetResourceEntry(
384 const std::string& resource_id,
385 const ClientContext& context,
386 const google_apis::GetResourceEntryCallback& callback) {
387 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
388 DCHECK(!callback.is_null());
390 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
391 new_job->context = context;
392 new_job->task = base::Bind(
393 &DriveServiceInterface::GetResourceEntry,
394 base::Unretained(drive_service_),
396 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
397 weak_ptr_factory_.GetWeakPtr(),
398 new_job->job_info.job_id,
400 new_job->abort_callback = CreateErrorRunCallback(callback);
404 void JobScheduler::GetShareUrl(
405 const std::string& resource_id,
406 const GURL& embed_origin,
407 const ClientContext& context,
408 const google_apis::GetShareUrlCallback& callback) {
409 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
410 DCHECK(!callback.is_null());
412 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
413 new_job->context = context;
414 new_job->task = base::Bind(
415 &DriveServiceInterface::GetShareUrl,
416 base::Unretained(drive_service_),
419 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
420 weak_ptr_factory_.GetWeakPtr(),
421 new_job->job_info.job_id,
423 new_job->abort_callback = CreateErrorRunCallback(callback);
427 void JobScheduler::TrashResource(
428 const std::string& resource_id,
429 const ClientContext& context,
430 const google_apis::EntryActionCallback& callback) {
431 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
432 DCHECK(!callback.is_null());
434 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
435 new_job->context = context;
436 new_job->task = base::Bind(
437 &DriveServiceInterface::TrashResource,
438 base::Unretained(drive_service_),
440 base::Bind(&JobScheduler::OnEntryActionJobDone,
441 weak_ptr_factory_.GetWeakPtr(),
442 new_job->job_info.job_id,
444 new_job->abort_callback = callback;
448 void JobScheduler::CopyResource(
449 const std::string& resource_id,
450 const std::string& parent_resource_id,
451 const std::string& new_title,
452 const base::Time& last_modified,
453 const google_apis::GetResourceEntryCallback& callback) {
454 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
455 DCHECK(!callback.is_null());
457 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
458 new_job->task = base::Bind(
459 &DriveServiceInterface::CopyResource,
460 base::Unretained(drive_service_),
465 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
466 weak_ptr_factory_.GetWeakPtr(),
467 new_job->job_info.job_id,
469 new_job->abort_callback = CreateErrorRunCallback(callback);
473 void JobScheduler::UpdateResource(
474 const std::string& resource_id,
475 const std::string& parent_resource_id,
476 const std::string& new_title,
477 const base::Time& last_modified,
478 const base::Time& last_viewed_by_me,
479 const ClientContext& context,
480 const google_apis::GetResourceEntryCallback& callback) {
481 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
482 DCHECK(!callback.is_null());
484 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
485 new_job->context = context;
486 new_job->task = base::Bind(
487 &DriveServiceInterface::UpdateResource,
488 base::Unretained(drive_service_),
494 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
495 weak_ptr_factory_.GetWeakPtr(),
496 new_job->job_info.job_id,
498 new_job->abort_callback = CreateErrorRunCallback(callback);
502 void JobScheduler::RenameResource(
503 const std::string& resource_id,
504 const std::string& new_title,
505 const google_apis::EntryActionCallback& callback) {
506 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
507 DCHECK(!callback.is_null());
509 JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
510 new_job->task = base::Bind(
511 &DriveServiceInterface::RenameResource,
512 base::Unretained(drive_service_),
515 base::Bind(&JobScheduler::OnEntryActionJobDone,
516 weak_ptr_factory_.GetWeakPtr(),
517 new_job->job_info.job_id,
519 new_job->abort_callback = callback;
523 void JobScheduler::AddResourceToDirectory(
524 const std::string& parent_resource_id,
525 const std::string& resource_id,
526 const google_apis::EntryActionCallback& callback) {
527 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
528 DCHECK(!callback.is_null());
530 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
531 new_job->task = base::Bind(
532 &DriveServiceInterface::AddResourceToDirectory,
533 base::Unretained(drive_service_),
536 base::Bind(&JobScheduler::OnEntryActionJobDone,
537 weak_ptr_factory_.GetWeakPtr(),
538 new_job->job_info.job_id,
540 new_job->abort_callback = callback;
544 void JobScheduler::RemoveResourceFromDirectory(
545 const std::string& parent_resource_id,
546 const std::string& resource_id,
547 const ClientContext& context,
548 const google_apis::EntryActionCallback& callback) {
549 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
551 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
552 new_job->context = context;
553 new_job->task = base::Bind(
554 &DriveServiceInterface::RemoveResourceFromDirectory,
555 base::Unretained(drive_service_),
558 base::Bind(&JobScheduler::OnEntryActionJobDone,
559 weak_ptr_factory_.GetWeakPtr(),
560 new_job->job_info.job_id,
562 new_job->abort_callback = callback;
566 void JobScheduler::AddNewDirectory(
567 const std::string& parent_resource_id,
568 const std::string& directory_title,
569 const DriveServiceInterface::AddNewDirectoryOptions& options,
570 const ClientContext& context,
571 const google_apis::GetResourceEntryCallback& callback) {
572 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
574 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
575 new_job->context = context;
576 new_job->task = base::Bind(
577 &DriveServiceInterface::AddNewDirectory,
578 base::Unretained(drive_service_),
582 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
583 weak_ptr_factory_.GetWeakPtr(),
584 new_job->job_info.job_id,
586 new_job->abort_callback = CreateErrorRunCallback(callback);
590 JobID JobScheduler::DownloadFile(
591 const base::FilePath& virtual_path,
592 int64 expected_file_size,
593 const base::FilePath& local_cache_path,
594 const std::string& resource_id,
595 const ClientContext& context,
596 const google_apis::DownloadActionCallback& download_action_callback,
597 const google_apis::GetContentCallback& get_content_callback) {
598 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
600 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
601 new_job->job_info.file_path = virtual_path;
602 new_job->job_info.num_total_bytes = expected_file_size;
603 new_job->context = context;
604 new_job->task = base::Bind(
605 &DriveServiceInterface::DownloadFile,
606 base::Unretained(drive_service_),
609 base::Bind(&JobScheduler::OnDownloadActionJobDone,
610 weak_ptr_factory_.GetWeakPtr(),
611 new_job->job_info.job_id,
612 download_action_callback),
613 get_content_callback,
614 base::Bind(&JobScheduler::UpdateProgress,
615 weak_ptr_factory_.GetWeakPtr(),
616 new_job->job_info.job_id));
617 new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
619 return new_job->job_info.job_id;
622 void JobScheduler::UploadNewFile(
623 const std::string& parent_resource_id,
624 const base::FilePath& drive_file_path,
625 const base::FilePath& local_file_path,
626 const std::string& title,
627 const std::string& content_type,
628 const DriveUploader::UploadNewFileOptions& options,
629 const ClientContext& context,
630 const google_apis::GetResourceEntryCallback& callback) {
631 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
633 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
634 new_job->job_info.file_path = drive_file_path;
635 new_job->context = context;
637 UploadNewFileParams params;
638 params.parent_resource_id = parent_resource_id;
639 params.local_file_path = local_file_path;
640 params.title = title;
641 params.content_type = content_type;
642 params.options = options;
644 ResumeUploadParams resume_params;
645 resume_params.local_file_path = params.local_file_path;
646 resume_params.content_type = params.content_type;
648 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
649 weak_ptr_factory_.GetWeakPtr(),
650 new_job->job_info.job_id,
653 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
654 weak_ptr_factory_.GetWeakPtr(),
655 new_job->job_info.job_id);
656 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
657 new_job->abort_callback = CreateErrorRunCallback(callback);
661 void JobScheduler::UploadExistingFile(
662 const std::string& resource_id,
663 const base::FilePath& drive_file_path,
664 const base::FilePath& local_file_path,
665 const std::string& content_type,
666 const DriveUploader::UploadExistingFileOptions& options,
667 const ClientContext& context,
668 const google_apis::GetResourceEntryCallback& callback) {
669 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
671 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
672 new_job->job_info.file_path = drive_file_path;
673 new_job->context = context;
675 UploadExistingFileParams params;
676 params.resource_id = resource_id;
677 params.local_file_path = local_file_path;
678 params.content_type = content_type;
679 params.options = options;
681 ResumeUploadParams resume_params;
682 resume_params.local_file_path = params.local_file_path;
683 resume_params.content_type = params.content_type;
685 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
686 weak_ptr_factory_.GetWeakPtr(),
687 new_job->job_info.job_id,
690 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
691 weak_ptr_factory_.GetWeakPtr(),
692 new_job->job_info.job_id);
693 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
694 new_job->abort_callback = CreateErrorRunCallback(callback);
698 void JobScheduler::GetResourceListInDirectoryByWapi(
699 const std::string& directory_resource_id,
700 const google_apis::GetResourceListCallback& callback) {
701 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
702 DCHECK(!callback.is_null());
704 JobEntry* new_job = CreateNewJob(
705 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
706 new_job->task = base::Bind(
707 &DriveServiceInterface::GetResourceListInDirectoryByWapi,
708 base::Unretained(drive_service_),
709 directory_resource_id,
710 base::Bind(&JobScheduler::OnGetResourceListJobDone,
711 weak_ptr_factory_.GetWeakPtr(),
712 new_job->job_info.job_id,
714 new_job->abort_callback = CreateErrorRunCallback(callback);
718 void JobScheduler::GetRemainingResourceList(
719 const GURL& next_link,
720 const google_apis::GetResourceListCallback& callback) {
721 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
722 DCHECK(!callback.is_null());
724 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
725 new_job->task = base::Bind(
726 &DriveServiceInterface::GetRemainingResourceList,
727 base::Unretained(drive_service_),
729 base::Bind(&JobScheduler::OnGetResourceListJobDone,
730 weak_ptr_factory_.GetWeakPtr(),
731 new_job->job_info.job_id,
733 new_job->abort_callback = CreateErrorRunCallback(callback);
737 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
738 JobEntry* job = new JobEntry(type);
739 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
743 void JobScheduler::StartJob(JobEntry* job) {
744 DCHECK(!job->task.is_null());
746 QueueJob(job->job_info.job_id);
747 NotifyJobAdded(job->job_info);
748 DoJobLoop(GetJobQueueType(job->job_info.job_type));
751 void JobScheduler::QueueJob(JobID job_id) {
752 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
754 JobEntry* job_entry = job_map_.Lookup(job_id);
756 const JobInfo& job_info = job_entry->job_info;
758 QueueType queue_type = GetJobQueueType(job_info.job_type);
759 queue_[queue_type]->Push(job_id, job_entry->context.type);
761 const std::string retry_prefix = job_entry->retry_count > 0 ?
762 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
763 logger_->Log(logging::LOG_INFO,
764 "Job queued%s: %s - %s",
765 retry_prefix.c_str(),
766 job_info.ToString().c_str(),
767 GetQueueInfo(queue_type).c_str());
770 void JobScheduler::DoJobLoop(QueueType queue_type) {
771 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
773 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
775 // Abort all USER_INITAITED jobs when not accepted.
776 if (accepted_priority < USER_INITIATED) {
777 std::vector<JobID> jobs;
778 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
779 for (size_t i = 0; i < jobs.size(); ++i) {
780 JobEntry* job = job_map_.Lookup(jobs[i]);
782 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
786 // Wait when throttled.
787 const base::Time now = base::Time::Now();
788 if (now < wait_until_) {
789 base::MessageLoopProxy::current()->PostDelayedTask(
791 base::Bind(&JobScheduler::DoJobLoop,
792 weak_ptr_factory_.GetWeakPtr(),
798 // Run the job with the highest priority in the queue.
800 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
803 JobEntry* entry = job_map_.Lookup(job_id);
806 JobInfo* job_info = &entry->job_info;
807 job_info->state = STATE_RUNNING;
808 job_info->start_time = now;
809 NotifyJobUpdated(*job_info);
811 entry->cancel_callback = entry->task.Run();
815 logger_->Log(logging::LOG_INFO,
816 "Job started: %s - %s",
817 job_info->ToString().c_str(),
818 GetQueueInfo(queue_type).c_str());
821 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
822 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
824 const int kNoJobShouldRun = -1;
826 // Should stop if Drive was disabled while running the fetch loop.
827 if (pref_service_->GetBoolean(prefs::kDisableDrive))
828 return kNoJobShouldRun;
830 // Should stop if the network is not online.
831 if (net::NetworkChangeNotifier::IsOffline())
832 return kNoJobShouldRun;
834 // For the file queue, if it is on cellular network, only user initiated
835 // operations are allowed to start.
836 if (queue_type == FILE_QUEUE &&
837 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
838 net::NetworkChangeNotifier::IsConnectionCellular(
839 net::NetworkChangeNotifier::GetConnectionType()))
840 return USER_INITIATED;
842 // Otherwise, every operations including background tasks are allowed.
846 void JobScheduler::UpdateWait() {
847 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
849 if (disable_throttling_ || throttle_count_ == 0)
852 // Exponential backoff: https://developers.google.com/drive/handle-errors.
853 base::TimeDelta delay =
854 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
855 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
856 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
858 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
861 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
862 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
864 JobEntry* job_entry = job_map_.Lookup(job_id);
866 JobInfo* job_info = &job_entry->job_info;
867 QueueType queue_type = GetJobQueueType(job_info->job_type);
868 queue_[queue_type]->MarkFinished(job_id);
870 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
871 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
872 logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
873 "Job done: %s => %s (elapsed time: %sms) - %s",
874 job_info->ToString().c_str(),
875 GDataErrorCodeToString(error).c_str(),
876 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
877 GetQueueInfo(queue_type).c_str());
879 // Retry, depending on the error.
880 const bool is_server_error =
881 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
882 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
883 if (is_server_error) {
884 if (throttle_count_ < kMaxThrottleCount)
891 const bool should_retry =
892 is_server_error && job_entry->retry_count < kMaxRetryCount;
894 job_entry->cancel_callback.Reset();
895 job_info->state = STATE_RETRY;
896 NotifyJobUpdated(*job_info);
898 ++job_entry->retry_count;
903 NotifyJobDone(*job_info, error);
904 // The job has finished, no retry will happen in the scheduler. Now we can
905 // remove the job info from the map.
906 job_map_.Remove(job_id);
909 // Post a task to continue the job loop. This allows us to finish handling
910 // the current job before starting the next one.
911 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
912 base::Bind(&JobScheduler::DoJobLoop,
913 weak_ptr_factory_.GetWeakPtr(),
915 return !should_retry;
918 void JobScheduler::OnGetResourceListJobDone(
920 const google_apis::GetResourceListCallback& callback,
921 google_apis::GDataErrorCode error,
922 scoped_ptr<google_apis::ResourceList> resource_list) {
923 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
924 DCHECK(!callback.is_null());
926 if (OnJobDone(job_id, error))
927 callback.Run(error, resource_list.Pass());
930 void JobScheduler::OnGetResourceEntryJobDone(
932 const google_apis::GetResourceEntryCallback& callback,
933 google_apis::GDataErrorCode error,
934 scoped_ptr<google_apis::ResourceEntry> entry) {
935 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
936 DCHECK(!callback.is_null());
938 if (OnJobDone(job_id, error))
939 callback.Run(error, entry.Pass());
942 void JobScheduler::OnGetAboutResourceJobDone(
944 const google_apis::AboutResourceCallback& callback,
945 google_apis::GDataErrorCode error,
946 scoped_ptr<google_apis::AboutResource> about_resource) {
947 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
948 DCHECK(!callback.is_null());
950 if (OnJobDone(job_id, error))
951 callback.Run(error, about_resource.Pass());
954 void JobScheduler::OnGetShareUrlJobDone(
956 const google_apis::GetShareUrlCallback& callback,
957 google_apis::GDataErrorCode error,
958 const GURL& share_url) {
959 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
960 DCHECK(!callback.is_null());
962 if (OnJobDone(job_id, error))
963 callback.Run(error, share_url);
966 void JobScheduler::OnGetAppListJobDone(
968 const google_apis::AppListCallback& callback,
969 google_apis::GDataErrorCode error,
970 scoped_ptr<google_apis::AppList> app_list) {
971 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
972 DCHECK(!callback.is_null());
974 if (OnJobDone(job_id, error))
975 callback.Run(error, app_list.Pass());
978 void JobScheduler::OnEntryActionJobDone(
980 const google_apis::EntryActionCallback& callback,
981 google_apis::GDataErrorCode error) {
982 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
983 DCHECK(!callback.is_null());
985 if (OnJobDone(job_id, error))
989 void JobScheduler::OnDownloadActionJobDone(
991 const google_apis::DownloadActionCallback& callback,
992 google_apis::GDataErrorCode error,
993 const base::FilePath& temp_file) {
994 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
995 DCHECK(!callback.is_null());
997 if (OnJobDone(job_id, error))
998 callback.Run(error, temp_file);
1001 void JobScheduler::OnUploadCompletionJobDone(
1003 const ResumeUploadParams& resume_params,
1004 const google_apis::GetResourceEntryCallback& callback,
1005 google_apis::GDataErrorCode error,
1006 const GURL& upload_location,
1007 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1008 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1009 DCHECK(!callback.is_null());
1011 if (!upload_location.is_empty()) {
1012 // If upload_location is available, update the task to resume the
1013 // upload process from the terminated point.
1014 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1015 // so OnJobDone called below will be in charge to re-queue the job.
1016 JobEntry* job_entry = job_map_.Lookup(job_id);
1019 ResumeUploadFileParams params;
1020 params.upload_location = upload_location;
1021 params.local_file_path = resume_params.local_file_path;
1022 params.content_type = resume_params.content_type;
1023 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1024 weak_ptr_factory_.GetWeakPtr(),
1028 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1029 weak_ptr_factory_.GetWeakPtr(),
1031 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1034 if (OnJobDone(job_id, error))
1035 callback.Run(error, resource_entry.Pass());
1038 void JobScheduler::OnResumeUploadFileDone(
1040 const base::Callback<google_apis::CancelCallback()>& original_task,
1041 const google_apis::GetResourceEntryCallback& callback,
1042 google_apis::GDataErrorCode error,
1043 const GURL& upload_location,
1044 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1045 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1046 DCHECK(!original_task.is_null());
1047 DCHECK(!callback.is_null());
1049 if (upload_location.is_empty()) {
1050 // If upload_location is not available, we should discard it and stop trying
1051 // to resume. Restore the original task.
1052 JobEntry* job_entry = job_map_.Lookup(job_id);
1054 job_entry->task = original_task;
1057 if (OnJobDone(job_id, error))
1058 callback.Run(error, resource_entry.Pass());
1061 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1062 JobEntry* job_entry = job_map_.Lookup(job_id);
1065 job_entry->job_info.num_completed_bytes = progress;
1067 job_entry->job_info.num_total_bytes = total;
1068 NotifyJobUpdated(job_entry->job_info);
1071 void JobScheduler::OnConnectionTypeChanged(
1072 net::NetworkChangeNotifier::ConnectionType type) {
1073 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1075 // Resume the job loop.
1076 // Note that we don't need to check the network connection status as it will
1077 // be checked in GetCurrentAcceptedPriority().
1078 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1079 DoJobLoop(static_cast<QueueType>(i));
1082 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1084 case TYPE_GET_ABOUT_RESOURCE:
1085 case TYPE_GET_APP_LIST:
1086 case TYPE_GET_ALL_RESOURCE_LIST:
1087 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1089 case TYPE_GET_CHANGE_LIST:
1090 case TYPE_GET_REMAINING_CHANGE_LIST:
1091 case TYPE_GET_REMAINING_FILE_LIST:
1092 case TYPE_GET_RESOURCE_ENTRY:
1093 case TYPE_GET_SHARE_URL:
1094 case TYPE_TRASH_RESOURCE:
1095 case TYPE_COPY_RESOURCE:
1096 case TYPE_UPDATE_RESOURCE:
1097 case TYPE_RENAME_RESOURCE:
1098 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1099 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1100 case TYPE_ADD_NEW_DIRECTORY:
1101 case TYPE_CREATE_FILE:
1102 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
1103 case TYPE_GET_REMAINING_RESOURCE_LIST:
1104 return METADATA_QUEUE;
1106 case TYPE_DOWNLOAD_FILE:
1107 case TYPE_UPLOAD_NEW_FILE:
1108 case TYPE_UPLOAD_EXISTING_FILE:
1115 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1116 google_apis::GDataErrorCode error) {
1117 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1119 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1120 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1121 logger_->Log(logging::LOG_INFO,
1122 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1123 job->job_info.ToString().c_str(),
1124 GDataErrorCodeToString(error).c_str(),
1125 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1126 GetQueueInfo(queue_type).c_str());
1128 base::Callback<void(google_apis::GDataErrorCode)> callback =
1129 job->abort_callback;
1130 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1131 NotifyJobDone(job->job_info, error);
1132 job_map_.Remove(job->job_info.job_id);
1133 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1134 base::Bind(callback, error));
1137 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1138 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1139 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1142 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1143 google_apis::GDataErrorCode error) {
1144 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1145 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1146 OnJobDone(job_info, GDataToFileError(error)));
1149 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1150 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1151 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1154 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1155 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1159 std::string JobScheduler::QueueTypeToString(QueueType type) {
1161 case METADATA_QUEUE:
1162 return "METADATA_QUEUE";
1164 return "FILE_QUEUE";
1166 break; // This value is just a sentinel. Should never be used.
1172 } // namespace drive