Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / chrome / browser / sync_file_system / drive_backend / sync_task_manager.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/memory/scoped_ptr.h"
10 #include "base/sequenced_task_runner.h"
11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
14
15 using fileapi::FileSystemURL;
16
17 namespace sync_file_system {
18 namespace drive_backend {
19
20 namespace {
21
22 class SyncTaskAdapter : public ExclusiveTask {
23  public:
24   explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
25   virtual ~SyncTaskAdapter() {}
26
27   virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
28     task_.Run(callback);
29   }
30
31  private:
32   SyncTaskManager::Task task_;
33
34   DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
35 };
36
37 }  // namespace
38
39 SyncTaskManager::PendingTask::PendingTask() {}
40
41 SyncTaskManager::PendingTask::PendingTask(
42     const base::Closure& task, Priority pri, int seq)
43     : task(task), priority(pri), seq(seq) {}
44
45 SyncTaskManager::PendingTask::~PendingTask() {}
46
47 bool SyncTaskManager::PendingTaskComparator::operator()(
48     const PendingTask& left,
49     const PendingTask& right) const {
50   if (left.priority != right.priority)
51     return left.priority < right.priority;
52   return left.seq > right.seq;
53 }
54
55 SyncTaskManager::SyncTaskManager(
56     base::WeakPtr<Client> client,
57     size_t maximum_background_task,
58     base::SequencedTaskRunner* task_runner)
59     : client_(client),
60       maximum_background_task_(maximum_background_task),
61       pending_task_seq_(0),
62       task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
63       task_runner_(task_runner) {
64 }
65
66 SyncTaskManager::~SyncTaskManager() {
67   client_.reset();
68   token_.reset();
69 }
70
71 void SyncTaskManager::Initialize(SyncStatusCode status) {
72   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
73   DCHECK(!token_);
74   NotifyTaskDone(
75       SyncTaskToken::CreateForForegroundTask(AsWeakPtr(), task_runner_),
76       status);
77 }
78
79 void SyncTaskManager::ScheduleTask(
80     const tracked_objects::Location& from_here,
81     const Task& task,
82     Priority priority,
83     const SyncStatusCallback& callback) {
84   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
85
86   ScheduleSyncTask(from_here,
87                    scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
88                    priority,
89                    callback);
90 }
91
92 void SyncTaskManager::ScheduleSyncTask(
93     const tracked_objects::Location& from_here,
94     scoped_ptr<SyncTask> task,
95     Priority priority,
96     const SyncStatusCallback& callback) {
97   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
98
99   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
100   if (!token) {
101     PushPendingTask(
102         base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
103                    base::Passed(&task), priority, callback),
104         priority);
105     return;
106   }
107   RunTask(token.Pass(), task.Pass());
108 }
109
110 bool SyncTaskManager::ScheduleTaskIfIdle(
111         const tracked_objects::Location& from_here,
112         const Task& task,
113         const SyncStatusCallback& callback) {
114   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
115
116   return ScheduleSyncTaskIfIdle(
117       from_here,
118       scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
119       callback);
120 }
121
122 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
123     const tracked_objects::Location& from_here,
124     scoped_ptr<SyncTask> task,
125     const SyncStatusCallback& callback) {
126   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
127
128   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
129   if (!token)
130     return false;
131   RunTask(token.Pass(), task.Pass());
132   return true;
133 }
134
135 // static
136 void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
137                                      SyncStatusCode status) {
138   DCHECK(token);
139
140   SyncTaskManager* manager = token->manager();
141   if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
142     DCHECK(!manager);
143     SyncStatusCallback callback = token->callback();
144     token->clear_callback();
145     callback.Run(status);
146     return;
147   }
148
149   if (manager)
150     manager->NotifyTaskDoneBody(token.Pass(), status);
151 }
152
153 // static
154 void SyncTaskManager::UpdateBlockingFactor(
155     scoped_ptr<SyncTaskToken> current_task_token,
156     scoped_ptr<BlockingFactor> blocking_factor,
157     const Continuation& continuation) {
158   DCHECK(current_task_token);
159
160   SyncTaskManager* manager = current_task_token->manager();
161   if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
162     DCHECK(!manager);
163     continuation.Run(current_task_token.Pass());
164     return;
165   }
166
167   if (!manager)
168     return;
169
170   scoped_ptr<SyncTaskToken> foreground_task_token;
171   scoped_ptr<SyncTaskToken> background_task_token;
172   scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
173   if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
174     foreground_task_token = current_task_token.Pass();
175   else
176     background_task_token = current_task_token.Pass();
177
178   manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
179                                     background_task_token.Pass(),
180                                     task_log.Pass(),
181                                     blocking_factor.Pass(),
182                                     continuation);
183 }
184
185 bool SyncTaskManager::IsRunningTask(int64 token_id) const {
186   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
187
188   // If the client is gone, all task should be aborted.
189   if (!client_)
190     return false;
191
192   if (token_id == SyncTaskToken::kForegroundTaskTokenID)
193     return true;
194
195   return ContainsKey(running_background_tasks_, token_id);
196 }
197
198 void SyncTaskManager::DetachFromSequence() {
199   sequence_checker_.DetachFromSequence();
200 }
201
202 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
203                                          SyncStatusCode status) {
204   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
205   DCHECK(token);
206
207   DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
208            << " (" << SyncStatusCodeToString(status) << ")"
209            << " " << token_->location().ToString();
210
211   if (token->blocking_factor()) {
212     dependency_manager_.Erase(token->blocking_factor());
213     token->clear_blocking_factor();
214   }
215
216   if (client_) {
217     if (token->has_task_log()) {
218       token->FinalizeTaskLog(SyncStatusCodeToString(status));
219       client_->RecordTaskLog(token->PassTaskLog());
220     }
221   }
222
223   scoped_ptr<SyncTask> task;
224   SyncStatusCallback callback = token->callback();
225   token->clear_callback();
226   if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
227     token_ = token.Pass();
228     task = running_foreground_task_.Pass();
229   } else {
230     task = running_background_tasks_.take_and_erase(token->token_id());
231   }
232
233   // Acquire the token to prevent a new task to jump into the queue.
234   token = token_.Pass();
235
236   bool task_used_network = false;
237   if (task)
238     task_used_network = task->used_network();
239
240   if (client_)
241     client_->NotifyLastOperationStatus(status, task_used_network);
242
243   if (!callback.is_null())
244     callback.Run(status);
245
246   // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
247   // making the call-chaing longer.
248   task_runner_->PostTask(
249       FROM_HERE,
250       base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
251                  AsWeakPtr(), base::Passed(&token)));
252 }
253
254 void SyncTaskManager::UpdateBlockingFactorBody(
255     scoped_ptr<SyncTaskToken> foreground_task_token,
256     scoped_ptr<SyncTaskToken> background_task_token,
257     scoped_ptr<TaskLogger::TaskLog> task_log,
258     scoped_ptr<BlockingFactor> blocking_factor,
259     const Continuation& continuation) {
260   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
261
262   // Run the task directly if the parallelization is disabled.
263   if (!maximum_background_task_) {
264     DCHECK(foreground_task_token);
265     DCHECK(!background_task_token);
266     foreground_task_token->SetTaskLog(task_log.Pass());
267     continuation.Run(foreground_task_token.Pass());
268     return;
269   }
270
271   // Clear existing |blocking_factor| from |dependency_manager_| before
272   // getting |foreground_task_token|, so that we can avoid dead lock.
273   if (background_task_token && background_task_token->blocking_factor()) {
274     dependency_manager_.Erase(background_task_token->blocking_factor());
275     background_task_token->clear_blocking_factor();
276   }
277
278   // Try to get |foreground_task_token|.  If it's not available, wait for
279   // current foreground task to finish.
280   if (!foreground_task_token) {
281     DCHECK(background_task_token);
282     foreground_task_token = GetToken(background_task_token->location(),
283                                      SyncStatusCallback());
284     if (!foreground_task_token) {
285       PushPendingTask(
286           base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
287                      AsWeakPtr(),
288                      base::Passed(&foreground_task_token),
289                      base::Passed(&background_task_token),
290                      base::Passed(&task_log),
291                      base::Passed(&blocking_factor),
292                      continuation),
293           PRIORITY_HIGH);
294       MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
295       return;
296     }
297   }
298
299   // Check if the task can run as a background task now.
300   // If there are too many task running or any other task blocks current
301   // task, wait for any other task to finish.
302   bool task_number_limit_exceeded =
303       !background_task_token &&
304       running_background_tasks_.size() >= maximum_background_task_;
305   if (task_number_limit_exceeded ||
306       !dependency_manager_.Insert(blocking_factor.get())) {
307     DCHECK(!running_background_tasks_.empty());
308     DCHECK(pending_backgrounding_task_.is_null());
309
310     // Wait for NotifyTaskDone to release a |blocking_factor|.
311     pending_backgrounding_task_ =
312         base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
313                    AsWeakPtr(),
314                    base::Passed(&foreground_task_token),
315                    base::Passed(&background_task_token),
316                    base::Passed(&task_log),
317                    base::Passed(&blocking_factor),
318                    continuation);
319     return;
320   }
321
322   if (background_task_token) {
323     background_task_token->set_blocking_factor(blocking_factor.Pass());
324   } else {
325     tracked_objects::Location from_here = foreground_task_token->location();
326     SyncStatusCallback callback = foreground_task_token->callback();
327     foreground_task_token->clear_callback();
328
329     background_task_token =
330         SyncTaskToken::CreateForBackgroundTask(
331             AsWeakPtr(),
332             task_runner_,
333             task_token_seq_++,
334             blocking_factor.Pass());
335     background_task_token->UpdateTask(from_here, callback);
336     running_background_tasks_.set(background_task_token->token_id(),
337                                   running_foreground_task_.Pass());
338   }
339
340   token_ = foreground_task_token.Pass();
341   MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
342   background_task_token->SetTaskLog(task_log.Pass());
343   continuation.Run(background_task_token.Pass());
344 }
345
346 scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
347     const tracked_objects::Location& from_here,
348     const SyncStatusCallback& callback) {
349   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
350
351   if (!token_)
352     return scoped_ptr<SyncTaskToken>();
353   token_->UpdateTask(from_here, callback);
354   return token_.Pass();
355 }
356
357 void SyncTaskManager::PushPendingTask(
358     const base::Closure& closure, Priority priority) {
359   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
360
361   pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
362 }
363
364 void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
365                               scoped_ptr<SyncTask> task) {
366   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
367   DCHECK(!running_foreground_task_);
368
369   running_foreground_task_ = task.Pass();
370   running_foreground_task_->RunPreflight(token.Pass());
371 }
372
373 void SyncTaskManager::MaybeStartNextForegroundTask(
374     scoped_ptr<SyncTaskToken> token) {
375   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
376
377   if (token) {
378     DCHECK(!token_);
379     token_ = token.Pass();
380   }
381
382   if (!pending_backgrounding_task_.is_null()) {
383     base::Closure closure = pending_backgrounding_task_;
384     pending_backgrounding_task_.Reset();
385     closure.Run();
386     return;
387   }
388
389   if (!token_)
390     return;
391
392   if (!pending_tasks_.empty()) {
393     base::Closure closure = pending_tasks_.top().task;
394     pending_tasks_.pop();
395     closure.Run();
396     return;
397   }
398
399   if (client_)
400     client_->MaybeScheduleNextTask();
401 }
402
403 }  // namespace drive_backend
404 }  // namespace sync_file_system