- add sources.
[platform/framework/web/crosswalk.git] / src / chrome / browser / chromeos / drive / job_scheduler.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
6
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/chromeos/drive/logging.h"
14 #include "chrome/browser/google_apis/drive_api_parser.h"
15 #include "chrome/browser/google_apis/task_util.h"
16 #include "chrome/common/pref_names.h"
17 #include "content/public/browser/browser_thread.h"
18
19 using content::BrowserThread;
20
21 namespace drive {
22
23 namespace {
24
25 const int kMaxThrottleCount = 4;
26
27 // According to the API documentation, this should be the same as
28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
29 // But currently multiplied by 2 to ensure upload related jobs retried for a
30 // sufficient number of times. crbug.com/269918
31 const int kMaxRetryCount = 2*kMaxThrottleCount;
32
33 // Parameter struct for RunUploadNewFile.
34 struct UploadNewFileParams {
35   std::string parent_resource_id;
36   base::FilePath local_file_path;
37   std::string title;
38   std::string content_type;
39   UploadCompletionCallback callback;
40   google_apis::ProgressCallback progress_callback;
41 };
42
43 // Helper function to work around the arity limitation of base::Bind.
44 google_apis::CancelCallback RunUploadNewFile(
45     DriveUploaderInterface* uploader,
46     const UploadNewFileParams& params) {
47   return uploader->UploadNewFile(params.parent_resource_id,
48                                  params.local_file_path,
49                                  params.title,
50                                  params.content_type,
51                                  params.callback,
52                                  params.progress_callback);
53 }
54
55 // Parameter struct for RunUploadExistingFile.
56 struct UploadExistingFileParams {
57   std::string resource_id;
58   base::FilePath local_file_path;
59   std::string content_type;
60   std::string etag;
61   UploadCompletionCallback callback;
62   google_apis::ProgressCallback progress_callback;
63 };
64
65 // Helper function to work around the arity limitation of base::Bind.
66 google_apis::CancelCallback RunUploadExistingFile(
67     DriveUploaderInterface* uploader,
68     const UploadExistingFileParams& params) {
69   return uploader->UploadExistingFile(params.resource_id,
70                                       params.local_file_path,
71                                       params.content_type,
72                                       params.etag,
73                                       params.callback,
74                                       params.progress_callback);
75 }
76
77 // Parameter struct for RunResumeUploadFile.
78 struct ResumeUploadFileParams {
79   GURL upload_location;
80   base::FilePath local_file_path;
81   std::string content_type;
82   UploadCompletionCallback callback;
83   google_apis::ProgressCallback progress_callback;
84 };
85
86 // Helper function to adjust the return type.
87 google_apis::CancelCallback RunResumeUploadFile(
88     DriveUploaderInterface* uploader,
89     const ResumeUploadFileParams& params) {
90   return uploader->ResumeUploadFile(params.upload_location,
91                                     params.local_file_path,
92                                     params.content_type,
93                                     params.callback,
94                                     params.progress_callback);
95 }
96
97 }  // namespace
98
99 const int JobScheduler::kMaxJobCount[] = {
100   5,  // METADATA_QUEUE
101   1,  // FILE_QUEUE
102 };
103
104 JobScheduler::JobEntry::JobEntry(JobType type)
105     : job_info(type),
106       context(ClientContext(USER_INITIATED)),
107       retry_count(0) {
108   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
109 }
110
111 JobScheduler::JobEntry::~JobEntry() {
112   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
113 }
114
115 struct JobScheduler::ResumeUploadParams {
116   base::FilePath drive_file_path;
117   base::FilePath local_file_path;
118   std::string content_type;
119 };
120
121 JobScheduler::JobScheduler(
122     PrefService* pref_service,
123     DriveServiceInterface* drive_service,
124     base::SequencedTaskRunner* blocking_task_runner)
125     : throttle_count_(0),
126       wait_until_(base::Time::Now()),
127       disable_throttling_(false),
128       drive_service_(drive_service),
129       uploader_(new DriveUploader(drive_service, blocking_task_runner)),
130       pref_service_(pref_service),
131       weak_ptr_factory_(this) {
132   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
133
134   for (int i = 0; i < NUM_QUEUES; ++i)
135     queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
136
137   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
138 }
139
140 JobScheduler::~JobScheduler() {
141   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
142
143   size_t num_queued_jobs = 0;
144   for (int i = 0; i < NUM_QUEUES; ++i)
145     num_queued_jobs += queue_[i]->GetNumberOfJobs();
146   DCHECK_EQ(num_queued_jobs, job_map_.size());
147
148   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
149 }
150
151 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
152   std::vector<JobInfo> job_info_list;
153   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
154     job_info_list.push_back(iter.GetCurrentValue()->job_info);
155   return job_info_list;
156 }
157
158 void JobScheduler::AddObserver(JobListObserver* observer) {
159   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
160   observer_list_.AddObserver(observer);
161 }
162
163 void JobScheduler::RemoveObserver(JobListObserver* observer) {
164   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
165   observer_list_.RemoveObserver(observer);
166 }
167
168 void JobScheduler::CancelJob(JobID job_id) {
169   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
170
171   JobEntry* job = job_map_.Lookup(job_id);
172   if (job) {
173     if (job->job_info.state == STATE_RUNNING) {
174       // If the job is running an HTTP request, cancel it via |cancel_callback|
175       // returned from the request, and wait for termination in the normal
176       // callback handler, OnJobDone.
177       if (!job->cancel_callback.is_null())
178         job->cancel_callback.Run();
179     } else {
180       AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
181     }
182   }
183 }
184
185 void JobScheduler::CancelAllJobs() {
186   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
187
188   // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
189   // removable during iteration.
190   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
191     CancelJob(iter.GetCurrentKey());
192 }
193
194 void JobScheduler::GetAboutResource(
195     const google_apis::AboutResourceCallback& callback) {
196   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
197   DCHECK(!callback.is_null());
198
199   JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
200   new_job->task = base::Bind(
201       &DriveServiceInterface::GetAboutResource,
202       base::Unretained(drive_service_),
203       base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
204                  weak_ptr_factory_.GetWeakPtr(),
205                  new_job->job_info.job_id,
206                  callback));
207   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
208   StartJob(new_job);
209 }
210
211 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
212   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
213   DCHECK(!callback.is_null());
214
215   JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
216   new_job->task = base::Bind(
217       &DriveServiceInterface::GetAppList,
218       base::Unretained(drive_service_),
219       base::Bind(&JobScheduler::OnGetAppListJobDone,
220                  weak_ptr_factory_.GetWeakPtr(),
221                  new_job->job_info.job_id,
222                  callback));
223   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
224   StartJob(new_job);
225 }
226
227 void JobScheduler::GetAllResourceList(
228     const google_apis::GetResourceListCallback& callback) {
229   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
230   DCHECK(!callback.is_null());
231
232   JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
233   new_job->task = base::Bind(
234       &DriveServiceInterface::GetAllResourceList,
235       base::Unretained(drive_service_),
236       base::Bind(&JobScheduler::OnGetResourceListJobDone,
237                  weak_ptr_factory_.GetWeakPtr(),
238                  new_job->job_info.job_id,
239                  callback));
240   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
241   StartJob(new_job);
242 }
243
244 void JobScheduler::GetResourceListInDirectory(
245     const std::string& directory_resource_id,
246     const google_apis::GetResourceListCallback& callback) {
247   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
248   DCHECK(!callback.is_null());
249
250   JobEntry* new_job = CreateNewJob(
251       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
252   new_job->task = base::Bind(
253       &DriveServiceInterface::GetResourceListInDirectory,
254       base::Unretained(drive_service_),
255       directory_resource_id,
256       base::Bind(&JobScheduler::OnGetResourceListJobDone,
257                  weak_ptr_factory_.GetWeakPtr(),
258                  new_job->job_info.job_id,
259                  callback));
260   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
261   StartJob(new_job);
262 }
263
264 void JobScheduler::Search(
265     const std::string& search_query,
266     const google_apis::GetResourceListCallback& callback) {
267   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
268   DCHECK(!callback.is_null());
269
270   JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
271   new_job->task = base::Bind(
272       &DriveServiceInterface::Search,
273       base::Unretained(drive_service_),
274       search_query,
275       base::Bind(&JobScheduler::OnGetResourceListJobDone,
276                  weak_ptr_factory_.GetWeakPtr(),
277                  new_job->job_info.job_id,
278                  callback));
279   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
280   StartJob(new_job);
281 }
282
283 void JobScheduler::GetChangeList(
284     int64 start_changestamp,
285     const google_apis::GetResourceListCallback& callback) {
286   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
287   DCHECK(!callback.is_null());
288
289   JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
290   new_job->task = base::Bind(
291       &DriveServiceInterface::GetChangeList,
292       base::Unretained(drive_service_),
293       start_changestamp,
294       base::Bind(&JobScheduler::OnGetResourceListJobDone,
295                  weak_ptr_factory_.GetWeakPtr(),
296                  new_job->job_info.job_id,
297                  callback));
298   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
299   StartJob(new_job);
300 }
301
302 void JobScheduler::GetRemainingChangeList(
303     const GURL& next_link,
304     const google_apis::GetResourceListCallback& callback) {
305   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
306   DCHECK(!callback.is_null());
307
308   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
309   new_job->task = base::Bind(
310       &DriveServiceInterface::GetRemainingChangeList,
311       base::Unretained(drive_service_),
312       next_link,
313       base::Bind(&JobScheduler::OnGetResourceListJobDone,
314                  weak_ptr_factory_.GetWeakPtr(),
315                  new_job->job_info.job_id,
316                  callback));
317   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
318   StartJob(new_job);
319 }
320
321 void JobScheduler::GetRemainingFileList(
322     const GURL& next_link,
323     const google_apis::GetResourceListCallback& callback) {
324   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
325   DCHECK(!callback.is_null());
326
327   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
328   new_job->task = base::Bind(
329       &DriveServiceInterface::GetRemainingFileList,
330       base::Unretained(drive_service_),
331       next_link,
332       base::Bind(&JobScheduler::OnGetResourceListJobDone,
333                  weak_ptr_factory_.GetWeakPtr(),
334                  new_job->job_info.job_id,
335                  callback));
336   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
337   StartJob(new_job);
338 }
339
340 void JobScheduler::GetShareUrl(
341     const std::string& resource_id,
342     const GURL& embed_origin,
343     const ClientContext& context,
344     const google_apis::GetShareUrlCallback& callback) {
345   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
346   DCHECK(!callback.is_null());
347
348   JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
349   new_job->context = context;
350   new_job->task = base::Bind(
351       &DriveServiceInterface::GetShareUrl,
352       base::Unretained(drive_service_),
353       resource_id,
354       embed_origin,
355       base::Bind(&JobScheduler::OnGetShareUrlJobDone,
356                  weak_ptr_factory_.GetWeakPtr(),
357                  new_job->job_info.job_id,
358                  callback));
359   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
360   StartJob(new_job);
361 }
362
363 void JobScheduler::DeleteResource(
364     const std::string& resource_id,
365     const google_apis::EntryActionCallback& callback) {
366   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
367   DCHECK(!callback.is_null());
368
369   JobEntry* new_job = CreateNewJob(TYPE_DELETE_RESOURCE);
370   new_job->task = base::Bind(
371       &DriveServiceInterface::DeleteResource,
372       base::Unretained(drive_service_),
373       resource_id,
374       "",  // etag
375       base::Bind(&JobScheduler::OnEntryActionJobDone,
376                  weak_ptr_factory_.GetWeakPtr(),
377                  new_job->job_info.job_id,
378                  callback));
379   new_job->abort_callback = callback;
380   StartJob(new_job);
381 }
382
383 void JobScheduler::CopyResource(
384     const std::string& resource_id,
385     const std::string& parent_resource_id,
386     const std::string& new_title,
387     const base::Time& last_modified,
388     const google_apis::GetResourceEntryCallback& callback) {
389   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
390   DCHECK(!callback.is_null());
391
392   JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
393   new_job->task = base::Bind(
394       &DriveServiceInterface::CopyResource,
395       base::Unretained(drive_service_),
396       resource_id,
397       parent_resource_id,
398       new_title,
399       last_modified,
400       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
401                  weak_ptr_factory_.GetWeakPtr(),
402                  new_job->job_info.job_id,
403                  callback));
404   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
405   StartJob(new_job);
406 }
407
408 void JobScheduler::CopyHostedDocument(
409     const std::string& resource_id,
410     const std::string& new_title,
411     const google_apis::GetResourceEntryCallback& callback) {
412   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
413   DCHECK(!callback.is_null());
414
415   JobEntry* new_job = CreateNewJob(TYPE_COPY_HOSTED_DOCUMENT);
416   new_job->task = base::Bind(
417       &DriveServiceInterface::CopyHostedDocument,
418       base::Unretained(drive_service_),
419       resource_id,
420       new_title,
421       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
422                  weak_ptr_factory_.GetWeakPtr(),
423                  new_job->job_info.job_id,
424                  callback));
425   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
426   StartJob(new_job);
427 }
428
429 void JobScheduler::MoveResource(
430     const std::string& resource_id,
431     const std::string& parent_resource_id,
432     const std::string& new_title,
433     const base::Time& last_modified,
434     const google_apis::GetResourceEntryCallback& callback) {
435   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
436   DCHECK(!callback.is_null());
437
438   JobEntry* new_job = CreateNewJob(TYPE_MOVE_RESOURCE);
439   new_job->task = base::Bind(
440       &DriveServiceInterface::MoveResource,
441       base::Unretained(drive_service_),
442       resource_id,
443       parent_resource_id,
444       new_title,
445       last_modified,
446       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
447                  weak_ptr_factory_.GetWeakPtr(),
448                  new_job->job_info.job_id,
449                  callback));
450   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
451   StartJob(new_job);
452 }
453
454 void JobScheduler::RenameResource(
455     const std::string& resource_id,
456     const std::string& new_title,
457     const google_apis::EntryActionCallback& callback) {
458   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
459   DCHECK(!callback.is_null());
460
461   JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
462   new_job->task = base::Bind(
463       &DriveServiceInterface::RenameResource,
464       base::Unretained(drive_service_),
465       resource_id,
466       new_title,
467       base::Bind(&JobScheduler::OnEntryActionJobDone,
468                  weak_ptr_factory_.GetWeakPtr(),
469                  new_job->job_info.job_id,
470                  callback));
471   new_job->abort_callback = callback;
472   StartJob(new_job);
473 }
474
475 void JobScheduler::TouchResource(
476     const std::string& resource_id,
477     const base::Time& modified_date,
478     const base::Time& last_viewed_by_me_date,
479     const google_apis::GetResourceEntryCallback& callback) {
480   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
481   DCHECK(!callback.is_null());
482
483   JobEntry* new_job = CreateNewJob(TYPE_TOUCH_RESOURCE);
484   new_job->task = base::Bind(
485       &DriveServiceInterface::TouchResource,
486       base::Unretained(drive_service_),
487       resource_id,
488       modified_date,
489       last_viewed_by_me_date,
490       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
491                  weak_ptr_factory_.GetWeakPtr(),
492                  new_job->job_info.job_id,
493                  callback));
494   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
495   StartJob(new_job);
496 }
497
498 void JobScheduler::AddResourceToDirectory(
499     const std::string& parent_resource_id,
500     const std::string& resource_id,
501     const google_apis::EntryActionCallback& callback) {
502   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
503   DCHECK(!callback.is_null());
504
505   JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
506   new_job->task = base::Bind(
507       &DriveServiceInterface::AddResourceToDirectory,
508       base::Unretained(drive_service_),
509       parent_resource_id,
510       resource_id,
511       base::Bind(&JobScheduler::OnEntryActionJobDone,
512                  weak_ptr_factory_.GetWeakPtr(),
513                  new_job->job_info.job_id,
514                  callback));
515   new_job->abort_callback = callback;
516   StartJob(new_job);
517 }
518
519 void JobScheduler::RemoveResourceFromDirectory(
520     const std::string& parent_resource_id,
521     const std::string& resource_id,
522     const google_apis::EntryActionCallback& callback) {
523   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
524
525   JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
526   new_job->task = base::Bind(
527       &DriveServiceInterface::RemoveResourceFromDirectory,
528       base::Unretained(drive_service_),
529       parent_resource_id,
530       resource_id,
531       base::Bind(&JobScheduler::OnEntryActionJobDone,
532                  weak_ptr_factory_.GetWeakPtr(),
533                  new_job->job_info.job_id,
534                  callback));
535   new_job->abort_callback = callback;
536   StartJob(new_job);
537 }
538
539 void JobScheduler::AddNewDirectory(
540     const std::string& parent_resource_id,
541     const std::string& directory_title,
542     const google_apis::GetResourceEntryCallback& callback) {
543   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
544
545   JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
546   new_job->task = base::Bind(
547       &DriveServiceInterface::AddNewDirectory,
548       base::Unretained(drive_service_),
549       parent_resource_id,
550       directory_title,
551       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
552                  weak_ptr_factory_.GetWeakPtr(),
553                  new_job->job_info.job_id,
554                  callback));
555   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
556   StartJob(new_job);
557 }
558
559 JobID JobScheduler::DownloadFile(
560     const base::FilePath& virtual_path,
561     int64 expected_file_size,
562     const base::FilePath& local_cache_path,
563     const std::string& resource_id,
564     const ClientContext& context,
565     const google_apis::DownloadActionCallback& download_action_callback,
566     const google_apis::GetContentCallback& get_content_callback) {
567   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
568
569   JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
570   new_job->job_info.file_path = virtual_path;
571   new_job->job_info.num_total_bytes = expected_file_size;
572   new_job->context = context;
573   new_job->task = base::Bind(
574       &DriveServiceInterface::DownloadFile,
575       base::Unretained(drive_service_),
576       local_cache_path,
577       resource_id,
578       base::Bind(&JobScheduler::OnDownloadActionJobDone,
579                  weak_ptr_factory_.GetWeakPtr(),
580                  new_job->job_info.job_id,
581                  download_action_callback),
582       get_content_callback,
583       base::Bind(&JobScheduler::UpdateProgress,
584                  weak_ptr_factory_.GetWeakPtr(),
585                  new_job->job_info.job_id));
586   new_job->abort_callback =
587       google_apis::CreateErrorRunCallback(download_action_callback);
588   StartJob(new_job);
589   return new_job->job_info.job_id;
590 }
591
592 void JobScheduler::UploadNewFile(
593     const std::string& parent_resource_id,
594     const base::FilePath& drive_file_path,
595     const base::FilePath& local_file_path,
596     const std::string& title,
597     const std::string& content_type,
598     const ClientContext& context,
599     const google_apis::GetResourceEntryCallback& callback) {
600   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
601
602   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
603   new_job->job_info.file_path = drive_file_path;
604   new_job->context = context;
605
606   UploadNewFileParams params;
607   params.parent_resource_id = parent_resource_id;
608   params.local_file_path = local_file_path;
609   params.title = title;
610   params.content_type = content_type;
611
612   ResumeUploadParams resume_params;
613   resume_params.local_file_path = params.local_file_path;
614   resume_params.content_type = params.content_type;
615
616   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
617                                weak_ptr_factory_.GetWeakPtr(),
618                                new_job->job_info.job_id,
619                                resume_params,
620                                callback);
621   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
622                                         weak_ptr_factory_.GetWeakPtr(),
623                                         new_job->job_info.job_id);
624   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
625   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
626   StartJob(new_job);
627 }
628
629 void JobScheduler::UploadExistingFile(
630     const std::string& resource_id,
631     const base::FilePath& drive_file_path,
632     const base::FilePath& local_file_path,
633     const std::string& content_type,
634     const std::string& etag,
635     const ClientContext& context,
636     const google_apis::GetResourceEntryCallback& callback) {
637   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
638
639   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
640   new_job->job_info.file_path = drive_file_path;
641   new_job->context = context;
642
643   UploadExistingFileParams params;
644   params.resource_id = resource_id;
645   params.local_file_path = local_file_path;
646   params.content_type = content_type;
647   params.etag = etag;
648
649   ResumeUploadParams resume_params;
650   resume_params.local_file_path = params.local_file_path;
651   resume_params.content_type = params.content_type;
652
653   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
654                                weak_ptr_factory_.GetWeakPtr(),
655                                new_job->job_info.job_id,
656                                resume_params,
657                                callback);
658   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
659                                         weak_ptr_factory_.GetWeakPtr(),
660                                         new_job->job_info.job_id);
661   new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
662   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
663   StartJob(new_job);
664 }
665
666 void JobScheduler::CreateFile(
667     const std::string& parent_resource_id,
668     const base::FilePath& drive_file_path,
669     const std::string& title,
670     const std::string& content_type,
671     const ClientContext& context,
672     const google_apis::GetResourceEntryCallback& callback) {
673   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
674
675   const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null"));
676
677   JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE);
678   new_job->job_info.file_path = drive_file_path;
679   new_job->context = context;
680
681   UploadNewFileParams params;
682   params.parent_resource_id = parent_resource_id;
683   params.local_file_path = kDevNull;  // Upload an empty file.
684   params.title = title;
685   params.content_type = content_type;
686
687   ResumeUploadParams resume_params;
688   resume_params.local_file_path = params.local_file_path;
689   resume_params.content_type = params.content_type;
690
691   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
692                                weak_ptr_factory_.GetWeakPtr(),
693                                new_job->job_info.job_id,
694                                resume_params,
695                                callback);
696   params.progress_callback = google_apis::ProgressCallback();
697
698   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
699   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
700   StartJob(new_job);
701 }
702
703 void JobScheduler::GetResourceListInDirectoryByWapi(
704     const std::string& directory_resource_id,
705     const google_apis::GetResourceListCallback& callback) {
706   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
707   DCHECK(!callback.is_null());
708
709   JobEntry* new_job = CreateNewJob(
710       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
711   new_job->task = base::Bind(
712       &DriveServiceInterface::GetResourceListInDirectoryByWapi,
713       base::Unretained(drive_service_),
714       directory_resource_id,
715       base::Bind(&JobScheduler::OnGetResourceListJobDone,
716                  weak_ptr_factory_.GetWeakPtr(),
717                  new_job->job_info.job_id,
718                  callback));
719   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
720   StartJob(new_job);
721 }
722
723 void JobScheduler::GetRemainingResourceList(
724     const GURL& next_link,
725     const google_apis::GetResourceListCallback& callback) {
726   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
727   DCHECK(!callback.is_null());
728
729   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
730   new_job->task = base::Bind(
731       &DriveServiceInterface::GetRemainingResourceList,
732       base::Unretained(drive_service_),
733       next_link,
734       base::Bind(&JobScheduler::OnGetResourceListJobDone,
735                  weak_ptr_factory_.GetWeakPtr(),
736                  new_job->job_info.job_id,
737                  callback));
738   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
739   StartJob(new_job);
740 }
741
742 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
743   JobEntry* job = new JobEntry(type);
744   job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
745   return job;
746 }
747
748 void JobScheduler::StartJob(JobEntry* job) {
749   DCHECK(!job->task.is_null());
750
751   QueueJob(job->job_info.job_id);
752   NotifyJobAdded(job->job_info);
753   DoJobLoop(GetJobQueueType(job->job_info.job_type));
754 }
755
756 void JobScheduler::QueueJob(JobID job_id) {
757   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
758
759   JobEntry* job_entry = job_map_.Lookup(job_id);
760   DCHECK(job_entry);
761   const JobInfo& job_info = job_entry->job_info;
762
763   QueueType queue_type = GetJobQueueType(job_info.job_type);
764   queue_[queue_type]->Push(job_id, job_entry->context.type);
765
766   const std::string retry_prefix = job_entry->retry_count > 0 ?
767       base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
768   util::Log(logging::LOG_INFO,
769             "Job queued%s: %s - %s",
770             retry_prefix.c_str(),
771             job_info.ToString().c_str(),
772             GetQueueInfo(queue_type).c_str());
773 }
774
775 void JobScheduler::DoJobLoop(QueueType queue_type) {
776   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
777
778   const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
779
780   // Abort all USER_INITAITED jobs when not accepted.
781   if (accepted_priority < USER_INITIATED) {
782     std::vector<JobID> jobs;
783     queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
784     for (size_t i = 0; i < jobs.size(); ++i) {
785       JobEntry* job = job_map_.Lookup(jobs[i]);
786       DCHECK(job);
787       AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
788     }
789   }
790
791   // Wait when throttled.
792   const base::Time now = base::Time::Now();
793   if (now < wait_until_) {
794     base::MessageLoopProxy::current()->PostDelayedTask(
795         FROM_HERE,
796         base::Bind(&JobScheduler::DoJobLoop,
797                    weak_ptr_factory_.GetWeakPtr(),
798                    queue_type),
799         wait_until_ - now);
800     return;
801   }
802
803   // Run the job with the highest priority in the queue.
804   JobID job_id = -1;
805   if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
806     return;
807
808   JobEntry* entry = job_map_.Lookup(job_id);
809   DCHECK(entry);
810
811   JobInfo* job_info = &entry->job_info;
812   job_info->state = STATE_RUNNING;
813   job_info->start_time = now;
814   NotifyJobUpdated(*job_info);
815
816   entry->cancel_callback = entry->task.Run();
817
818   UpdateWait();
819
820   util::Log(logging::LOG_INFO,
821             "Job started: %s - %s",
822             job_info->ToString().c_str(),
823             GetQueueInfo(queue_type).c_str());
824 }
825
826 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
827   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
828
829   const int kNoJobShouldRun = -1;
830
831   // Should stop if Drive was disabled while running the fetch loop.
832   if (pref_service_->GetBoolean(prefs::kDisableDrive))
833     return kNoJobShouldRun;
834
835   // Should stop if the network is not online.
836   if (net::NetworkChangeNotifier::IsOffline())
837     return kNoJobShouldRun;
838
839   // For the file queue, if it is on cellular network, only user initiated
840   // operations are allowed to start.
841   if (queue_type == FILE_QUEUE &&
842       pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
843       net::NetworkChangeNotifier::IsConnectionCellular(
844           net::NetworkChangeNotifier::GetConnectionType()))
845     return USER_INITIATED;
846
847   // Otherwise, every operations including background tasks are allowed.
848   return BACKGROUND;
849 }
850
851 void JobScheduler::UpdateWait() {
852   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
853
854   if (disable_throttling_ || throttle_count_ == 0)
855     return;
856
857   // Exponential backoff: https://developers.google.com/drive/handle-errors.
858   base::TimeDelta delay =
859       base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
860       base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
861   VLOG(1) << "Throttling for " << delay.InMillisecondsF();
862
863   wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
864 }
865
866 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
867   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
868
869   JobEntry* job_entry = job_map_.Lookup(job_id);
870   DCHECK(job_entry);
871   JobInfo* job_info = &job_entry->job_info;
872   QueueType queue_type = GetJobQueueType(job_info->job_type);
873   queue_[queue_type]->MarkFinished(job_id);
874
875   const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
876   bool success = (GDataToFileError(error) == FILE_ERROR_OK);
877   util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
878             "Job done: %s => %s (elapsed time: %sms) - %s",
879             job_info->ToString().c_str(),
880             GDataErrorCodeToString(error).c_str(),
881             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
882             GetQueueInfo(queue_type).c_str());
883
884   // Retry, depending on the error.
885   const bool is_server_error =
886       error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
887       error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
888   if (is_server_error) {
889     if (throttle_count_ < kMaxThrottleCount)
890       ++throttle_count_;
891     UpdateWait();
892   } else {
893     throttle_count_ = 0;
894   }
895
896   const bool should_retry =
897       is_server_error && job_entry->retry_count < kMaxRetryCount;
898   if (should_retry) {
899     job_entry->cancel_callback.Reset();
900     job_info->state = STATE_RETRY;
901     NotifyJobUpdated(*job_info);
902
903     ++job_entry->retry_count;
904
905     // Requeue the job.
906     QueueJob(job_id);
907   } else {
908     NotifyJobDone(*job_info, error);
909     // The job has finished, no retry will happen in the scheduler. Now we can
910     // remove the job info from the map.
911     job_map_.Remove(job_id);
912   }
913
914   // Post a task to continue the job loop.  This allows us to finish handling
915   // the current job before starting the next one.
916   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
917       base::Bind(&JobScheduler::DoJobLoop,
918                  weak_ptr_factory_.GetWeakPtr(),
919                  queue_type));
920   return !should_retry;
921 }
922
923 void JobScheduler::OnGetResourceListJobDone(
924     JobID job_id,
925     const google_apis::GetResourceListCallback& callback,
926     google_apis::GDataErrorCode error,
927     scoped_ptr<google_apis::ResourceList> resource_list) {
928   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
929   DCHECK(!callback.is_null());
930
931   if (OnJobDone(job_id, error))
932     callback.Run(error, resource_list.Pass());
933 }
934
935 void JobScheduler::OnGetResourceEntryJobDone(
936     JobID job_id,
937     const google_apis::GetResourceEntryCallback& callback,
938     google_apis::GDataErrorCode error,
939     scoped_ptr<google_apis::ResourceEntry> entry) {
940   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
941   DCHECK(!callback.is_null());
942
943   if (OnJobDone(job_id, error))
944     callback.Run(error, entry.Pass());
945 }
946
947 void JobScheduler::OnGetAboutResourceJobDone(
948     JobID job_id,
949     const google_apis::AboutResourceCallback& callback,
950     google_apis::GDataErrorCode error,
951     scoped_ptr<google_apis::AboutResource> about_resource) {
952   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
953   DCHECK(!callback.is_null());
954
955   if (OnJobDone(job_id, error))
956     callback.Run(error, about_resource.Pass());
957 }
958
959 void JobScheduler::OnGetShareUrlJobDone(
960     JobID job_id,
961     const google_apis::GetShareUrlCallback& callback,
962     google_apis::GDataErrorCode error,
963     const GURL& share_url) {
964   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
965   DCHECK(!callback.is_null());
966
967   if (OnJobDone(job_id, error))
968     callback.Run(error, share_url);
969 }
970
971 void JobScheduler::OnGetAppListJobDone(
972     JobID job_id,
973     const google_apis::AppListCallback& callback,
974     google_apis::GDataErrorCode error,
975     scoped_ptr<google_apis::AppList> app_list) {
976   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
977   DCHECK(!callback.is_null());
978
979   if (OnJobDone(job_id, error))
980     callback.Run(error, app_list.Pass());
981 }
982
983 void JobScheduler::OnEntryActionJobDone(
984     JobID job_id,
985     const google_apis::EntryActionCallback& callback,
986     google_apis::GDataErrorCode error) {
987   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
988   DCHECK(!callback.is_null());
989
990   if (OnJobDone(job_id, error))
991     callback.Run(error);
992 }
993
994 void JobScheduler::OnDownloadActionJobDone(
995     JobID job_id,
996     const google_apis::DownloadActionCallback& callback,
997     google_apis::GDataErrorCode error,
998     const base::FilePath& temp_file) {
999   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1000   DCHECK(!callback.is_null());
1001
1002   if (OnJobDone(job_id, error))
1003     callback.Run(error, temp_file);
1004 }
1005
1006 void JobScheduler::OnUploadCompletionJobDone(
1007     JobID job_id,
1008     const ResumeUploadParams& resume_params,
1009     const google_apis::GetResourceEntryCallback& callback,
1010     google_apis::GDataErrorCode error,
1011     const GURL& upload_location,
1012     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1013   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1014   DCHECK(!callback.is_null());
1015
1016   if (!upload_location.is_empty()) {
1017     // If upload_location is available, update the task to resume the
1018     // upload process from the terminated point.
1019     // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1020     // so OnJobDone called below will be in charge to re-queue the job.
1021     JobEntry* job_entry = job_map_.Lookup(job_id);
1022     DCHECK(job_entry);
1023
1024     ResumeUploadFileParams params;
1025     params.upload_location = upload_location;
1026     params.local_file_path = resume_params.local_file_path;
1027     params.content_type = resume_params.content_type;
1028     params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1029                                  weak_ptr_factory_.GetWeakPtr(),
1030                                  job_id,
1031                                  job_entry->task,
1032                                  callback);
1033     params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1034                                           weak_ptr_factory_.GetWeakPtr(),
1035                                           job_id);
1036     job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1037   }
1038
1039   if (OnJobDone(job_id, error))
1040     callback.Run(error, resource_entry.Pass());
1041 }
1042
1043 void JobScheduler::OnResumeUploadFileDone(
1044     JobID job_id,
1045     const base::Callback<google_apis::CancelCallback()>& original_task,
1046     const google_apis::GetResourceEntryCallback& callback,
1047     google_apis::GDataErrorCode error,
1048     const GURL& upload_location,
1049     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1050   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1051   DCHECK(!original_task.is_null());
1052   DCHECK(!callback.is_null());
1053
1054   if (upload_location.is_empty()) {
1055     // If upload_location is not available, we should discard it and stop trying
1056     // to resume. Restore the original task.
1057     JobEntry* job_entry = job_map_.Lookup(job_id);
1058     DCHECK(job_entry);
1059     job_entry->task = original_task;
1060   }
1061
1062   if (OnJobDone(job_id, error))
1063     callback.Run(error, resource_entry.Pass());
1064 }
1065
1066 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1067   JobEntry* job_entry = job_map_.Lookup(job_id);
1068   DCHECK(job_entry);
1069
1070   job_entry->job_info.num_completed_bytes = progress;
1071   if (total != -1)
1072     job_entry->job_info.num_total_bytes = total;
1073   NotifyJobUpdated(job_entry->job_info);
1074 }
1075
1076 void JobScheduler::OnConnectionTypeChanged(
1077     net::NetworkChangeNotifier::ConnectionType type) {
1078   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1079
1080   // Resume the job loop.
1081   // Note that we don't need to check the network connection status as it will
1082   // be checked in GetCurrentAcceptedPriority().
1083   for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1084     DoJobLoop(static_cast<QueueType>(i));
1085 }
1086
1087 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1088   switch (type) {
1089     case TYPE_GET_ABOUT_RESOURCE:
1090     case TYPE_GET_APP_LIST:
1091     case TYPE_GET_ALL_RESOURCE_LIST:
1092     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1093     case TYPE_SEARCH:
1094     case TYPE_GET_CHANGE_LIST:
1095     case TYPE_GET_REMAINING_CHANGE_LIST:
1096     case TYPE_GET_REMAINING_FILE_LIST:
1097     case TYPE_GET_SHARE_URL:
1098     case TYPE_DELETE_RESOURCE:
1099     case TYPE_COPY_RESOURCE:
1100     case TYPE_COPY_HOSTED_DOCUMENT:
1101     case TYPE_MOVE_RESOURCE:
1102     case TYPE_RENAME_RESOURCE:
1103     case TYPE_TOUCH_RESOURCE:
1104     case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1105     case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1106     case TYPE_ADD_NEW_DIRECTORY:
1107     case TYPE_CREATE_FILE:
1108     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
1109     case TYPE_GET_REMAINING_RESOURCE_LIST:
1110       return METADATA_QUEUE;
1111
1112     case TYPE_DOWNLOAD_FILE:
1113     case TYPE_UPLOAD_NEW_FILE:
1114     case TYPE_UPLOAD_EXISTING_FILE:
1115       return FILE_QUEUE;
1116   }
1117   NOTREACHED();
1118   return FILE_QUEUE;
1119 }
1120
1121 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1122                                       google_apis::GDataErrorCode error) {
1123   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1124
1125   const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1126   const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1127   util::Log(logging::LOG_INFO,
1128             "Job aborted: %s => %s (elapsed time: %sms) - %s",
1129             job->job_info.ToString().c_str(),
1130             GDataErrorCodeToString(error).c_str(),
1131             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1132             GetQueueInfo(queue_type).c_str());
1133
1134   base::Callback<void(google_apis::GDataErrorCode)> callback =
1135       job->abort_callback;
1136   queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1137   NotifyJobDone(job->job_info, error);
1138   job_map_.Remove(job->job_info.job_id);
1139   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1140                                               base::Bind(callback, error));
1141 }
1142
1143 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1144   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1145   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1146 }
1147
1148 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1149                                  google_apis::GDataErrorCode error) {
1150   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1151   FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1152                     OnJobDone(job_info, GDataToFileError(error)));
1153 }
1154
1155 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1156   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1157   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1158 }
1159
1160 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1161   return QueueTypeToString(type) + " " + queue_[type]->ToString();
1162 }
1163
1164 // static
1165 std::string JobScheduler::QueueTypeToString(QueueType type) {
1166   switch (type) {
1167     case METADATA_QUEUE:
1168       return "METADATA_QUEUE";
1169     case FILE_QUEUE:
1170       return "FILE_QUEUE";
1171     case NUM_QUEUES:
1172       break;  // This value is just a sentinel. Should never be used.
1173   }
1174   NOTREACHED();
1175   return "";
1176 }
1177
1178 }  // namespace drive