1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/chromeos/drive/logging.h"
14 #include "chrome/browser/google_apis/drive_api_parser.h"
15 #include "chrome/browser/google_apis/task_util.h"
16 #include "chrome/common/pref_names.h"
17 #include "content/public/browser/browser_thread.h"
19 using content::BrowserThread;
25 const int kMaxThrottleCount = 4;
27 // According to the API documentation, this should be the same as
28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
29 // But currently multiplied by 2 to ensure upload related jobs retried for a
30 // sufficient number of times. crbug.com/269918
31 const int kMaxRetryCount = 2*kMaxThrottleCount;
33 // Parameter struct for RunUploadNewFile.
34 struct UploadNewFileParams {
35 std::string parent_resource_id;
36 base::FilePath local_file_path;
38 std::string content_type;
39 UploadCompletionCallback callback;
40 google_apis::ProgressCallback progress_callback;
43 // Helper function to work around the arity limitation of base::Bind.
44 google_apis::CancelCallback RunUploadNewFile(
45 DriveUploaderInterface* uploader,
46 const UploadNewFileParams& params) {
47 return uploader->UploadNewFile(params.parent_resource_id,
48 params.local_file_path,
52 params.progress_callback);
55 // Parameter struct for RunUploadExistingFile.
56 struct UploadExistingFileParams {
57 std::string resource_id;
58 base::FilePath local_file_path;
59 std::string content_type;
61 UploadCompletionCallback callback;
62 google_apis::ProgressCallback progress_callback;
65 // Helper function to work around the arity limitation of base::Bind.
66 google_apis::CancelCallback RunUploadExistingFile(
67 DriveUploaderInterface* uploader,
68 const UploadExistingFileParams& params) {
69 return uploader->UploadExistingFile(params.resource_id,
70 params.local_file_path,
74 params.progress_callback);
77 // Parameter struct for RunResumeUploadFile.
78 struct ResumeUploadFileParams {
80 base::FilePath local_file_path;
81 std::string content_type;
82 UploadCompletionCallback callback;
83 google_apis::ProgressCallback progress_callback;
86 // Helper function to adjust the return type.
87 google_apis::CancelCallback RunResumeUploadFile(
88 DriveUploaderInterface* uploader,
89 const ResumeUploadFileParams& params) {
90 return uploader->ResumeUploadFile(params.upload_location,
91 params.local_file_path,
94 params.progress_callback);
99 const int JobScheduler::kMaxJobCount[] = {
104 JobScheduler::JobEntry::JobEntry(JobType type)
106 context(ClientContext(USER_INITIATED)),
108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
111 JobScheduler::JobEntry::~JobEntry() {
112 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
115 struct JobScheduler::ResumeUploadParams {
116 base::FilePath drive_file_path;
117 base::FilePath local_file_path;
118 std::string content_type;
121 JobScheduler::JobScheduler(
122 PrefService* pref_service,
123 DriveServiceInterface* drive_service,
124 base::SequencedTaskRunner* blocking_task_runner)
125 : throttle_count_(0),
126 wait_until_(base::Time::Now()),
127 disable_throttling_(false),
128 drive_service_(drive_service),
129 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
130 pref_service_(pref_service),
131 weak_ptr_factory_(this) {
132 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
134 for (int i = 0; i < NUM_QUEUES; ++i)
135 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
137 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
140 JobScheduler::~JobScheduler() {
141 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
143 size_t num_queued_jobs = 0;
144 for (int i = 0; i < NUM_QUEUES; ++i)
145 num_queued_jobs += queue_[i]->GetNumberOfJobs();
146 DCHECK_EQ(num_queued_jobs, job_map_.size());
148 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
151 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
152 std::vector<JobInfo> job_info_list;
153 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
154 job_info_list.push_back(iter.GetCurrentValue()->job_info);
155 return job_info_list;
158 void JobScheduler::AddObserver(JobListObserver* observer) {
159 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
160 observer_list_.AddObserver(observer);
163 void JobScheduler::RemoveObserver(JobListObserver* observer) {
164 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
165 observer_list_.RemoveObserver(observer);
168 void JobScheduler::CancelJob(JobID job_id) {
169 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171 JobEntry* job = job_map_.Lookup(job_id);
173 if (job->job_info.state == STATE_RUNNING) {
174 // If the job is running an HTTP request, cancel it via |cancel_callback|
175 // returned from the request, and wait for termination in the normal
176 // callback handler, OnJobDone.
177 if (!job->cancel_callback.is_null())
178 job->cancel_callback.Run();
180 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
185 void JobScheduler::CancelAllJobs() {
186 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
188 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
189 // removable during iteration.
190 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
191 CancelJob(iter.GetCurrentKey());
194 void JobScheduler::GetAboutResource(
195 const google_apis::AboutResourceCallback& callback) {
196 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
197 DCHECK(!callback.is_null());
199 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
200 new_job->task = base::Bind(
201 &DriveServiceInterface::GetAboutResource,
202 base::Unretained(drive_service_),
203 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
204 weak_ptr_factory_.GetWeakPtr(),
205 new_job->job_info.job_id,
207 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
211 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
212 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
213 DCHECK(!callback.is_null());
215 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
216 new_job->task = base::Bind(
217 &DriveServiceInterface::GetAppList,
218 base::Unretained(drive_service_),
219 base::Bind(&JobScheduler::OnGetAppListJobDone,
220 weak_ptr_factory_.GetWeakPtr(),
221 new_job->job_info.job_id,
223 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
227 void JobScheduler::GetAllResourceList(
228 const google_apis::GetResourceListCallback& callback) {
229 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
230 DCHECK(!callback.is_null());
232 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
233 new_job->task = base::Bind(
234 &DriveServiceInterface::GetAllResourceList,
235 base::Unretained(drive_service_),
236 base::Bind(&JobScheduler::OnGetResourceListJobDone,
237 weak_ptr_factory_.GetWeakPtr(),
238 new_job->job_info.job_id,
240 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
244 void JobScheduler::GetResourceListInDirectory(
245 const std::string& directory_resource_id,
246 const google_apis::GetResourceListCallback& callback) {
247 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
248 DCHECK(!callback.is_null());
250 JobEntry* new_job = CreateNewJob(
251 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
252 new_job->task = base::Bind(
253 &DriveServiceInterface::GetResourceListInDirectory,
254 base::Unretained(drive_service_),
255 directory_resource_id,
256 base::Bind(&JobScheduler::OnGetResourceListJobDone,
257 weak_ptr_factory_.GetWeakPtr(),
258 new_job->job_info.job_id,
260 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
264 void JobScheduler::Search(
265 const std::string& search_query,
266 const google_apis::GetResourceListCallback& callback) {
267 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
268 DCHECK(!callback.is_null());
270 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
271 new_job->task = base::Bind(
272 &DriveServiceInterface::Search,
273 base::Unretained(drive_service_),
275 base::Bind(&JobScheduler::OnGetResourceListJobDone,
276 weak_ptr_factory_.GetWeakPtr(),
277 new_job->job_info.job_id,
279 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
283 void JobScheduler::GetChangeList(
284 int64 start_changestamp,
285 const google_apis::GetResourceListCallback& callback) {
286 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
287 DCHECK(!callback.is_null());
289 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
290 new_job->task = base::Bind(
291 &DriveServiceInterface::GetChangeList,
292 base::Unretained(drive_service_),
294 base::Bind(&JobScheduler::OnGetResourceListJobDone,
295 weak_ptr_factory_.GetWeakPtr(),
296 new_job->job_info.job_id,
298 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
302 void JobScheduler::GetRemainingChangeList(
303 const GURL& next_link,
304 const google_apis::GetResourceListCallback& callback) {
305 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
306 DCHECK(!callback.is_null());
308 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
309 new_job->task = base::Bind(
310 &DriveServiceInterface::GetRemainingChangeList,
311 base::Unretained(drive_service_),
313 base::Bind(&JobScheduler::OnGetResourceListJobDone,
314 weak_ptr_factory_.GetWeakPtr(),
315 new_job->job_info.job_id,
317 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
321 void JobScheduler::GetRemainingFileList(
322 const GURL& next_link,
323 const google_apis::GetResourceListCallback& callback) {
324 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
325 DCHECK(!callback.is_null());
327 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
328 new_job->task = base::Bind(
329 &DriveServiceInterface::GetRemainingFileList,
330 base::Unretained(drive_service_),
332 base::Bind(&JobScheduler::OnGetResourceListJobDone,
333 weak_ptr_factory_.GetWeakPtr(),
334 new_job->job_info.job_id,
336 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
340 void JobScheduler::GetShareUrl(
341 const std::string& resource_id,
342 const GURL& embed_origin,
343 const ClientContext& context,
344 const google_apis::GetShareUrlCallback& callback) {
345 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
346 DCHECK(!callback.is_null());
348 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
349 new_job->context = context;
350 new_job->task = base::Bind(
351 &DriveServiceInterface::GetShareUrl,
352 base::Unretained(drive_service_),
355 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
356 weak_ptr_factory_.GetWeakPtr(),
357 new_job->job_info.job_id,
359 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
363 void JobScheduler::DeleteResource(
364 const std::string& resource_id,
365 const google_apis::EntryActionCallback& callback) {
366 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
367 DCHECK(!callback.is_null());
369 JobEntry* new_job = CreateNewJob(TYPE_DELETE_RESOURCE);
370 new_job->task = base::Bind(
371 &DriveServiceInterface::DeleteResource,
372 base::Unretained(drive_service_),
375 base::Bind(&JobScheduler::OnEntryActionJobDone,
376 weak_ptr_factory_.GetWeakPtr(),
377 new_job->job_info.job_id,
379 new_job->abort_callback = callback;
383 void JobScheduler::CopyResource(
384 const std::string& resource_id,
385 const std::string& parent_resource_id,
386 const std::string& new_title,
387 const base::Time& last_modified,
388 const google_apis::GetResourceEntryCallback& callback) {
389 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
390 DCHECK(!callback.is_null());
392 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
393 new_job->task = base::Bind(
394 &DriveServiceInterface::CopyResource,
395 base::Unretained(drive_service_),
400 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
401 weak_ptr_factory_.GetWeakPtr(),
402 new_job->job_info.job_id,
404 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
408 void JobScheduler::CopyHostedDocument(
409 const std::string& resource_id,
410 const std::string& new_title,
411 const google_apis::GetResourceEntryCallback& callback) {
412 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
413 DCHECK(!callback.is_null());
415 JobEntry* new_job = CreateNewJob(TYPE_COPY_HOSTED_DOCUMENT);
416 new_job->task = base::Bind(
417 &DriveServiceInterface::CopyHostedDocument,
418 base::Unretained(drive_service_),
421 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
422 weak_ptr_factory_.GetWeakPtr(),
423 new_job->job_info.job_id,
425 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
429 void JobScheduler::MoveResource(
430 const std::string& resource_id,
431 const std::string& parent_resource_id,
432 const std::string& new_title,
433 const base::Time& last_modified,
434 const google_apis::GetResourceEntryCallback& callback) {
435 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
436 DCHECK(!callback.is_null());
438 JobEntry* new_job = CreateNewJob(TYPE_MOVE_RESOURCE);
439 new_job->task = base::Bind(
440 &DriveServiceInterface::MoveResource,
441 base::Unretained(drive_service_),
446 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
447 weak_ptr_factory_.GetWeakPtr(),
448 new_job->job_info.job_id,
450 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
454 void JobScheduler::RenameResource(
455 const std::string& resource_id,
456 const std::string& new_title,
457 const google_apis::EntryActionCallback& callback) {
458 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
459 DCHECK(!callback.is_null());
461 JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
462 new_job->task = base::Bind(
463 &DriveServiceInterface::RenameResource,
464 base::Unretained(drive_service_),
467 base::Bind(&JobScheduler::OnEntryActionJobDone,
468 weak_ptr_factory_.GetWeakPtr(),
469 new_job->job_info.job_id,
471 new_job->abort_callback = callback;
475 void JobScheduler::TouchResource(
476 const std::string& resource_id,
477 const base::Time& modified_date,
478 const base::Time& last_viewed_by_me_date,
479 const google_apis::GetResourceEntryCallback& callback) {
480 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
481 DCHECK(!callback.is_null());
483 JobEntry* new_job = CreateNewJob(TYPE_TOUCH_RESOURCE);
484 new_job->task = base::Bind(
485 &DriveServiceInterface::TouchResource,
486 base::Unretained(drive_service_),
489 last_viewed_by_me_date,
490 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
491 weak_ptr_factory_.GetWeakPtr(),
492 new_job->job_info.job_id,
494 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
498 void JobScheduler::AddResourceToDirectory(
499 const std::string& parent_resource_id,
500 const std::string& resource_id,
501 const google_apis::EntryActionCallback& callback) {
502 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
503 DCHECK(!callback.is_null());
505 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
506 new_job->task = base::Bind(
507 &DriveServiceInterface::AddResourceToDirectory,
508 base::Unretained(drive_service_),
511 base::Bind(&JobScheduler::OnEntryActionJobDone,
512 weak_ptr_factory_.GetWeakPtr(),
513 new_job->job_info.job_id,
515 new_job->abort_callback = callback;
519 void JobScheduler::RemoveResourceFromDirectory(
520 const std::string& parent_resource_id,
521 const std::string& resource_id,
522 const google_apis::EntryActionCallback& callback) {
523 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
525 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
526 new_job->task = base::Bind(
527 &DriveServiceInterface::RemoveResourceFromDirectory,
528 base::Unretained(drive_service_),
531 base::Bind(&JobScheduler::OnEntryActionJobDone,
532 weak_ptr_factory_.GetWeakPtr(),
533 new_job->job_info.job_id,
535 new_job->abort_callback = callback;
539 void JobScheduler::AddNewDirectory(
540 const std::string& parent_resource_id,
541 const std::string& directory_title,
542 const google_apis::GetResourceEntryCallback& callback) {
543 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
545 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
546 new_job->task = base::Bind(
547 &DriveServiceInterface::AddNewDirectory,
548 base::Unretained(drive_service_),
551 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
552 weak_ptr_factory_.GetWeakPtr(),
553 new_job->job_info.job_id,
555 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
559 JobID JobScheduler::DownloadFile(
560 const base::FilePath& virtual_path,
561 int64 expected_file_size,
562 const base::FilePath& local_cache_path,
563 const std::string& resource_id,
564 const ClientContext& context,
565 const google_apis::DownloadActionCallback& download_action_callback,
566 const google_apis::GetContentCallback& get_content_callback) {
567 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
569 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
570 new_job->job_info.file_path = virtual_path;
571 new_job->job_info.num_total_bytes = expected_file_size;
572 new_job->context = context;
573 new_job->task = base::Bind(
574 &DriveServiceInterface::DownloadFile,
575 base::Unretained(drive_service_),
578 base::Bind(&JobScheduler::OnDownloadActionJobDone,
579 weak_ptr_factory_.GetWeakPtr(),
580 new_job->job_info.job_id,
581 download_action_callback),
582 get_content_callback,
583 base::Bind(&JobScheduler::UpdateProgress,
584 weak_ptr_factory_.GetWeakPtr(),
585 new_job->job_info.job_id));
586 new_job->abort_callback =
587 google_apis::CreateErrorRunCallback(download_action_callback);
589 return new_job->job_info.job_id;
592 void JobScheduler::UploadNewFile(
593 const std::string& parent_resource_id,
594 const base::FilePath& drive_file_path,
595 const base::FilePath& local_file_path,
596 const std::string& title,
597 const std::string& content_type,
598 const ClientContext& context,
599 const google_apis::GetResourceEntryCallback& callback) {
600 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
602 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
603 new_job->job_info.file_path = drive_file_path;
604 new_job->context = context;
606 UploadNewFileParams params;
607 params.parent_resource_id = parent_resource_id;
608 params.local_file_path = local_file_path;
609 params.title = title;
610 params.content_type = content_type;
612 ResumeUploadParams resume_params;
613 resume_params.local_file_path = params.local_file_path;
614 resume_params.content_type = params.content_type;
616 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
617 weak_ptr_factory_.GetWeakPtr(),
618 new_job->job_info.job_id,
621 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
622 weak_ptr_factory_.GetWeakPtr(),
623 new_job->job_info.job_id);
624 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
625 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
629 void JobScheduler::UploadExistingFile(
630 const std::string& resource_id,
631 const base::FilePath& drive_file_path,
632 const base::FilePath& local_file_path,
633 const std::string& content_type,
634 const std::string& etag,
635 const ClientContext& context,
636 const google_apis::GetResourceEntryCallback& callback) {
637 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
639 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
640 new_job->job_info.file_path = drive_file_path;
641 new_job->context = context;
643 UploadExistingFileParams params;
644 params.resource_id = resource_id;
645 params.local_file_path = local_file_path;
646 params.content_type = content_type;
649 ResumeUploadParams resume_params;
650 resume_params.local_file_path = params.local_file_path;
651 resume_params.content_type = params.content_type;
653 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
654 weak_ptr_factory_.GetWeakPtr(),
655 new_job->job_info.job_id,
658 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
659 weak_ptr_factory_.GetWeakPtr(),
660 new_job->job_info.job_id);
661 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
662 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
666 void JobScheduler::CreateFile(
667 const std::string& parent_resource_id,
668 const base::FilePath& drive_file_path,
669 const std::string& title,
670 const std::string& content_type,
671 const ClientContext& context,
672 const google_apis::GetResourceEntryCallback& callback) {
673 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
675 const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null"));
677 JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE);
678 new_job->job_info.file_path = drive_file_path;
679 new_job->context = context;
681 UploadNewFileParams params;
682 params.parent_resource_id = parent_resource_id;
683 params.local_file_path = kDevNull; // Upload an empty file.
684 params.title = title;
685 params.content_type = content_type;
687 ResumeUploadParams resume_params;
688 resume_params.local_file_path = params.local_file_path;
689 resume_params.content_type = params.content_type;
691 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
692 weak_ptr_factory_.GetWeakPtr(),
693 new_job->job_info.job_id,
696 params.progress_callback = google_apis::ProgressCallback();
698 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
699 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
703 void JobScheduler::GetResourceListInDirectoryByWapi(
704 const std::string& directory_resource_id,
705 const google_apis::GetResourceListCallback& callback) {
706 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
707 DCHECK(!callback.is_null());
709 JobEntry* new_job = CreateNewJob(
710 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
711 new_job->task = base::Bind(
712 &DriveServiceInterface::GetResourceListInDirectoryByWapi,
713 base::Unretained(drive_service_),
714 directory_resource_id,
715 base::Bind(&JobScheduler::OnGetResourceListJobDone,
716 weak_ptr_factory_.GetWeakPtr(),
717 new_job->job_info.job_id,
719 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
723 void JobScheduler::GetRemainingResourceList(
724 const GURL& next_link,
725 const google_apis::GetResourceListCallback& callback) {
726 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
727 DCHECK(!callback.is_null());
729 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
730 new_job->task = base::Bind(
731 &DriveServiceInterface::GetRemainingResourceList,
732 base::Unretained(drive_service_),
734 base::Bind(&JobScheduler::OnGetResourceListJobDone,
735 weak_ptr_factory_.GetWeakPtr(),
736 new_job->job_info.job_id,
738 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
742 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
743 JobEntry* job = new JobEntry(type);
744 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
748 void JobScheduler::StartJob(JobEntry* job) {
749 DCHECK(!job->task.is_null());
751 QueueJob(job->job_info.job_id);
752 NotifyJobAdded(job->job_info);
753 DoJobLoop(GetJobQueueType(job->job_info.job_type));
756 void JobScheduler::QueueJob(JobID job_id) {
757 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
759 JobEntry* job_entry = job_map_.Lookup(job_id);
761 const JobInfo& job_info = job_entry->job_info;
763 QueueType queue_type = GetJobQueueType(job_info.job_type);
764 queue_[queue_type]->Push(job_id, job_entry->context.type);
766 const std::string retry_prefix = job_entry->retry_count > 0 ?
767 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
768 util::Log(logging::LOG_INFO,
769 "Job queued%s: %s - %s",
770 retry_prefix.c_str(),
771 job_info.ToString().c_str(),
772 GetQueueInfo(queue_type).c_str());
775 void JobScheduler::DoJobLoop(QueueType queue_type) {
776 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
778 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
780 // Abort all USER_INITAITED jobs when not accepted.
781 if (accepted_priority < USER_INITIATED) {
782 std::vector<JobID> jobs;
783 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
784 for (size_t i = 0; i < jobs.size(); ++i) {
785 JobEntry* job = job_map_.Lookup(jobs[i]);
787 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
791 // Wait when throttled.
792 const base::Time now = base::Time::Now();
793 if (now < wait_until_) {
794 base::MessageLoopProxy::current()->PostDelayedTask(
796 base::Bind(&JobScheduler::DoJobLoop,
797 weak_ptr_factory_.GetWeakPtr(),
803 // Run the job with the highest priority in the queue.
805 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
808 JobEntry* entry = job_map_.Lookup(job_id);
811 JobInfo* job_info = &entry->job_info;
812 job_info->state = STATE_RUNNING;
813 job_info->start_time = now;
814 NotifyJobUpdated(*job_info);
816 entry->cancel_callback = entry->task.Run();
820 util::Log(logging::LOG_INFO,
821 "Job started: %s - %s",
822 job_info->ToString().c_str(),
823 GetQueueInfo(queue_type).c_str());
826 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
827 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
829 const int kNoJobShouldRun = -1;
831 // Should stop if Drive was disabled while running the fetch loop.
832 if (pref_service_->GetBoolean(prefs::kDisableDrive))
833 return kNoJobShouldRun;
835 // Should stop if the network is not online.
836 if (net::NetworkChangeNotifier::IsOffline())
837 return kNoJobShouldRun;
839 // For the file queue, if it is on cellular network, only user initiated
840 // operations are allowed to start.
841 if (queue_type == FILE_QUEUE &&
842 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
843 net::NetworkChangeNotifier::IsConnectionCellular(
844 net::NetworkChangeNotifier::GetConnectionType()))
845 return USER_INITIATED;
847 // Otherwise, every operations including background tasks are allowed.
851 void JobScheduler::UpdateWait() {
852 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
854 if (disable_throttling_ || throttle_count_ == 0)
857 // Exponential backoff: https://developers.google.com/drive/handle-errors.
858 base::TimeDelta delay =
859 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
860 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
861 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
863 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
866 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
867 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
869 JobEntry* job_entry = job_map_.Lookup(job_id);
871 JobInfo* job_info = &job_entry->job_info;
872 QueueType queue_type = GetJobQueueType(job_info->job_type);
873 queue_[queue_type]->MarkFinished(job_id);
875 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
876 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
877 util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
878 "Job done: %s => %s (elapsed time: %sms) - %s",
879 job_info->ToString().c_str(),
880 GDataErrorCodeToString(error).c_str(),
881 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
882 GetQueueInfo(queue_type).c_str());
884 // Retry, depending on the error.
885 const bool is_server_error =
886 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
887 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
888 if (is_server_error) {
889 if (throttle_count_ < kMaxThrottleCount)
896 const bool should_retry =
897 is_server_error && job_entry->retry_count < kMaxRetryCount;
899 job_entry->cancel_callback.Reset();
900 job_info->state = STATE_RETRY;
901 NotifyJobUpdated(*job_info);
903 ++job_entry->retry_count;
908 NotifyJobDone(*job_info, error);
909 // The job has finished, no retry will happen in the scheduler. Now we can
910 // remove the job info from the map.
911 job_map_.Remove(job_id);
914 // Post a task to continue the job loop. This allows us to finish handling
915 // the current job before starting the next one.
916 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
917 base::Bind(&JobScheduler::DoJobLoop,
918 weak_ptr_factory_.GetWeakPtr(),
920 return !should_retry;
923 void JobScheduler::OnGetResourceListJobDone(
925 const google_apis::GetResourceListCallback& callback,
926 google_apis::GDataErrorCode error,
927 scoped_ptr<google_apis::ResourceList> resource_list) {
928 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
929 DCHECK(!callback.is_null());
931 if (OnJobDone(job_id, error))
932 callback.Run(error, resource_list.Pass());
935 void JobScheduler::OnGetResourceEntryJobDone(
937 const google_apis::GetResourceEntryCallback& callback,
938 google_apis::GDataErrorCode error,
939 scoped_ptr<google_apis::ResourceEntry> entry) {
940 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
941 DCHECK(!callback.is_null());
943 if (OnJobDone(job_id, error))
944 callback.Run(error, entry.Pass());
947 void JobScheduler::OnGetAboutResourceJobDone(
949 const google_apis::AboutResourceCallback& callback,
950 google_apis::GDataErrorCode error,
951 scoped_ptr<google_apis::AboutResource> about_resource) {
952 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
953 DCHECK(!callback.is_null());
955 if (OnJobDone(job_id, error))
956 callback.Run(error, about_resource.Pass());
959 void JobScheduler::OnGetShareUrlJobDone(
961 const google_apis::GetShareUrlCallback& callback,
962 google_apis::GDataErrorCode error,
963 const GURL& share_url) {
964 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
965 DCHECK(!callback.is_null());
967 if (OnJobDone(job_id, error))
968 callback.Run(error, share_url);
971 void JobScheduler::OnGetAppListJobDone(
973 const google_apis::AppListCallback& callback,
974 google_apis::GDataErrorCode error,
975 scoped_ptr<google_apis::AppList> app_list) {
976 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
977 DCHECK(!callback.is_null());
979 if (OnJobDone(job_id, error))
980 callback.Run(error, app_list.Pass());
983 void JobScheduler::OnEntryActionJobDone(
985 const google_apis::EntryActionCallback& callback,
986 google_apis::GDataErrorCode error) {
987 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
988 DCHECK(!callback.is_null());
990 if (OnJobDone(job_id, error))
994 void JobScheduler::OnDownloadActionJobDone(
996 const google_apis::DownloadActionCallback& callback,
997 google_apis::GDataErrorCode error,
998 const base::FilePath& temp_file) {
999 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1000 DCHECK(!callback.is_null());
1002 if (OnJobDone(job_id, error))
1003 callback.Run(error, temp_file);
1006 void JobScheduler::OnUploadCompletionJobDone(
1008 const ResumeUploadParams& resume_params,
1009 const google_apis::GetResourceEntryCallback& callback,
1010 google_apis::GDataErrorCode error,
1011 const GURL& upload_location,
1012 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1013 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1014 DCHECK(!callback.is_null());
1016 if (!upload_location.is_empty()) {
1017 // If upload_location is available, update the task to resume the
1018 // upload process from the terminated point.
1019 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1020 // so OnJobDone called below will be in charge to re-queue the job.
1021 JobEntry* job_entry = job_map_.Lookup(job_id);
1024 ResumeUploadFileParams params;
1025 params.upload_location = upload_location;
1026 params.local_file_path = resume_params.local_file_path;
1027 params.content_type = resume_params.content_type;
1028 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1029 weak_ptr_factory_.GetWeakPtr(),
1033 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1034 weak_ptr_factory_.GetWeakPtr(),
1036 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1039 if (OnJobDone(job_id, error))
1040 callback.Run(error, resource_entry.Pass());
1043 void JobScheduler::OnResumeUploadFileDone(
1045 const base::Callback<google_apis::CancelCallback()>& original_task,
1046 const google_apis::GetResourceEntryCallback& callback,
1047 google_apis::GDataErrorCode error,
1048 const GURL& upload_location,
1049 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1050 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1051 DCHECK(!original_task.is_null());
1052 DCHECK(!callback.is_null());
1054 if (upload_location.is_empty()) {
1055 // If upload_location is not available, we should discard it and stop trying
1056 // to resume. Restore the original task.
1057 JobEntry* job_entry = job_map_.Lookup(job_id);
1059 job_entry->task = original_task;
1062 if (OnJobDone(job_id, error))
1063 callback.Run(error, resource_entry.Pass());
1066 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1067 JobEntry* job_entry = job_map_.Lookup(job_id);
1070 job_entry->job_info.num_completed_bytes = progress;
1072 job_entry->job_info.num_total_bytes = total;
1073 NotifyJobUpdated(job_entry->job_info);
1076 void JobScheduler::OnConnectionTypeChanged(
1077 net::NetworkChangeNotifier::ConnectionType type) {
1078 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1080 // Resume the job loop.
1081 // Note that we don't need to check the network connection status as it will
1082 // be checked in GetCurrentAcceptedPriority().
1083 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1084 DoJobLoop(static_cast<QueueType>(i));
1087 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1089 case TYPE_GET_ABOUT_RESOURCE:
1090 case TYPE_GET_APP_LIST:
1091 case TYPE_GET_ALL_RESOURCE_LIST:
1092 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1094 case TYPE_GET_CHANGE_LIST:
1095 case TYPE_GET_REMAINING_CHANGE_LIST:
1096 case TYPE_GET_REMAINING_FILE_LIST:
1097 case TYPE_GET_SHARE_URL:
1098 case TYPE_DELETE_RESOURCE:
1099 case TYPE_COPY_RESOURCE:
1100 case TYPE_COPY_HOSTED_DOCUMENT:
1101 case TYPE_MOVE_RESOURCE:
1102 case TYPE_RENAME_RESOURCE:
1103 case TYPE_TOUCH_RESOURCE:
1104 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1105 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1106 case TYPE_ADD_NEW_DIRECTORY:
1107 case TYPE_CREATE_FILE:
1108 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
1109 case TYPE_GET_REMAINING_RESOURCE_LIST:
1110 return METADATA_QUEUE;
1112 case TYPE_DOWNLOAD_FILE:
1113 case TYPE_UPLOAD_NEW_FILE:
1114 case TYPE_UPLOAD_EXISTING_FILE:
1121 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1122 google_apis::GDataErrorCode error) {
1123 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1125 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1126 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1127 util::Log(logging::LOG_INFO,
1128 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1129 job->job_info.ToString().c_str(),
1130 GDataErrorCodeToString(error).c_str(),
1131 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1132 GetQueueInfo(queue_type).c_str());
1134 base::Callback<void(google_apis::GDataErrorCode)> callback =
1135 job->abort_callback;
1136 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1137 NotifyJobDone(job->job_info, error);
1138 job_map_.Remove(job->job_info.job_id);
1139 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1140 base::Bind(callback, error));
1143 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1144 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1145 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1148 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1149 google_apis::GDataErrorCode error) {
1150 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1151 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1152 OnJobDone(job_info, GDataToFileError(error)));
1155 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1156 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1157 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1160 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1161 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1165 std::string JobScheduler::QueueTypeToString(QueueType type) {
1167 case METADATA_QUEUE:
1168 return "METADATA_QUEUE";
1170 return "FILE_QUEUE";
1172 break; // This value is just a sentinel. Should never be used.
1178 } // namespace drive