Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / sync / internal_api / attachments / attachment_service_impl.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h"
6
7 #include <iterator>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "base/time/time.h"
13 #include "sync/api/attachments/attachment.h"
14 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
15 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
16
17 namespace syncer {
18
19 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
20 // GetOrDownloadState tracks completion of these calls and posts callback for
21 // consumer once all attachments are either retrieved or reported unavailable.
22 class AttachmentServiceImpl::GetOrDownloadState
23     : public base::RefCounted<GetOrDownloadState>,
24       public base::NonThreadSafe {
25  public:
26   // GetOrDownloadState gets parameter from values passed to
27   // AttachmentService::GetOrDownloadAttachments.
28   // |attachment_ids| is a list of attachmens to retrieve.
29   // |callback| will be posted on current thread when all attachments retrieved
30   // or confirmed unavailable.
31   GetOrDownloadState(const AttachmentIdList& attachment_ids,
32                      const GetOrDownloadCallback& callback);
33
34   // Attachment was just retrieved. Add it to retrieved attachments.
35   void AddAttachment(const Attachment& attachment);
36
37   // Both reading from local store and downloading attachment failed.
38   // Add it to unavailable set.
39   void AddUnavailableAttachmentId(const AttachmentId& attachment_id);
40
41  private:
42   friend class base::RefCounted<GetOrDownloadState>;
43   virtual ~GetOrDownloadState();
44
45   // If all attachment requests completed then post callback to consumer with
46   // results.
47   void PostResultIfAllRequestsCompleted();
48
49   GetOrDownloadCallback callback_;
50
51   // Requests for these attachments are still in progress.
52   AttachmentIdSet in_progress_attachments_;
53
54   AttachmentIdSet unavailable_attachments_;
55   scoped_ptr<AttachmentMap> retrieved_attachments_;
56
57   DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState);
58 };
59
60 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
61     const AttachmentIdList& attachment_ids,
62     const GetOrDownloadCallback& callback)
63     : callback_(callback), retrieved_attachments_(new AttachmentMap()) {
64   std::copy(
65       attachment_ids.begin(),
66       attachment_ids.end(),
67       std::inserter(in_progress_attachments_, in_progress_attachments_.end()));
68   PostResultIfAllRequestsCompleted();
69 }
70
71 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
72   DCHECK(CalledOnValidThread());
73 }
74
75 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
76     const Attachment& attachment) {
77   DCHECK(CalledOnValidThread());
78   DCHECK(retrieved_attachments_->find(attachment.GetId()) ==
79          retrieved_attachments_->end());
80   retrieved_attachments_->insert(
81       std::make_pair(attachment.GetId(), attachment));
82   DCHECK(in_progress_attachments_.find(attachment.GetId()) !=
83          in_progress_attachments_.end());
84   in_progress_attachments_.erase(attachment.GetId());
85   PostResultIfAllRequestsCompleted();
86 }
87
88 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
89     const AttachmentId& attachment_id) {
90   DCHECK(CalledOnValidThread());
91   DCHECK(unavailable_attachments_.find(attachment_id) ==
92          unavailable_attachments_.end());
93   unavailable_attachments_.insert(attachment_id);
94   DCHECK(in_progress_attachments_.find(attachment_id) !=
95          in_progress_attachments_.end());
96   in_progress_attachments_.erase(attachment_id);
97   PostResultIfAllRequestsCompleted();
98 }
99
100 void
101 AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() {
102   if (in_progress_attachments_.empty()) {
103     // All requests completed. Let's notify consumer.
104     GetOrDownloadResult result =
105         unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR;
106     base::MessageLoop::current()->PostTask(
107         FROM_HERE,
108         base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
109   }
110 }
111
112 AttachmentServiceImpl::AttachmentServiceImpl(
113     scoped_refptr<AttachmentStore> attachment_store,
114     scoped_ptr<AttachmentUploader> attachment_uploader,
115     scoped_ptr<AttachmentDownloader> attachment_downloader,
116     Delegate* delegate,
117     const base::TimeDelta& initial_backoff_delay,
118     const base::TimeDelta& max_backoff_delay)
119     : attachment_store_(attachment_store),
120       attachment_uploader_(attachment_uploader.Pass()),
121       attachment_downloader_(attachment_downloader.Pass()),
122       delegate_(delegate),
123       weak_ptr_factory_(this) {
124   DCHECK(CalledOnValidThread());
125   DCHECK(attachment_store_.get());
126
127   // TODO(maniscalco): Observe network connectivity change events.  When the
128   // network becomes disconnected, consider suspending queue dispatch.  When
129   // connectivity is restored, consider clearing any dispatch backoff (bug
130   // 411981).
131   upload_task_queue_.reset(new TaskQueue<AttachmentId>(
132       base::Bind(&AttachmentServiceImpl::BeginUpload,
133                  weak_ptr_factory_.GetWeakPtr()),
134       initial_backoff_delay,
135       max_backoff_delay));
136
137   net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
138 }
139
140 AttachmentServiceImpl::~AttachmentServiceImpl() {
141   DCHECK(CalledOnValidThread());
142   net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
143 }
144
145 // Static.
146 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
147   scoped_refptr<syncer::AttachmentStore> attachment_store =
148       AttachmentStore::CreateInMemoryStore();
149   scoped_ptr<AttachmentUploader> attachment_uploader(
150       new FakeAttachmentUploader);
151   scoped_ptr<AttachmentDownloader> attachment_downloader(
152       new FakeAttachmentDownloader());
153   scoped_ptr<syncer::AttachmentService> attachment_service(
154       new syncer::AttachmentServiceImpl(attachment_store,
155                                         attachment_uploader.Pass(),
156                                         attachment_downloader.Pass(),
157                                         NULL,
158                                         base::TimeDelta(),
159                                         base::TimeDelta()));
160   return attachment_service.Pass();
161 }
162
163 AttachmentStore* AttachmentServiceImpl::GetStore() {
164   return attachment_store_.get();
165 }
166
167 void AttachmentServiceImpl::GetOrDownloadAttachments(
168     const AttachmentIdList& attachment_ids,
169     const GetOrDownloadCallback& callback) {
170   DCHECK(CalledOnValidThread());
171   scoped_refptr<GetOrDownloadState> state(
172       new GetOrDownloadState(attachment_ids, callback));
173   attachment_store_->Read(attachment_ids,
174                           base::Bind(&AttachmentServiceImpl::ReadDone,
175                                      weak_ptr_factory_.GetWeakPtr(),
176                                      state));
177 }
178
179 void AttachmentServiceImpl::DropAttachments(
180     const AttachmentIdList& attachment_ids,
181     const DropCallback& callback) {
182   DCHECK(CalledOnValidThread());
183   attachment_store_->Drop(attachment_ids,
184                           base::Bind(&AttachmentServiceImpl::DropDone,
185                                      weak_ptr_factory_.GetWeakPtr(),
186                                      callback));
187 }
188
189 void AttachmentServiceImpl::ReadDone(
190     const scoped_refptr<GetOrDownloadState>& state,
191     const AttachmentStore::Result& result,
192     scoped_ptr<AttachmentMap> attachments,
193     scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
194   // Add read attachments to result.
195   for (AttachmentMap::const_iterator iter = attachments->begin();
196        iter != attachments->end();
197        ++iter) {
198     state->AddAttachment(iter->second);
199   }
200
201   AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
202   AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
203   if (attachment_downloader_.get()) {
204     // Try to download locally unavailable attachments.
205     for (; iter != end; ++iter) {
206       attachment_downloader_->DownloadAttachment(
207           *iter,
208           base::Bind(&AttachmentServiceImpl::DownloadDone,
209                      weak_ptr_factory_.GetWeakPtr(),
210                      state,
211                      *iter));
212     }
213   } else {
214     // No downloader so all locally unavailable attachments are unavailable.
215     for (; iter != end; ++iter) {
216       state->AddUnavailableAttachmentId(*iter);
217     }
218   }
219 }
220
221 void AttachmentServiceImpl::DropDone(const DropCallback& callback,
222                                      const AttachmentStore::Result& result) {
223   AttachmentService::DropResult drop_result =
224       AttachmentService::DROP_UNSPECIFIED_ERROR;
225   if (result == AttachmentStore::SUCCESS) {
226     drop_result = AttachmentService::DROP_SUCCESS;
227   }
228   // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
229   base::MessageLoop::current()->PostTask(FROM_HERE,
230                                          base::Bind(callback, drop_result));
231 }
232
233 void AttachmentServiceImpl::UploadDone(
234     const AttachmentUploader::UploadResult& result,
235     const AttachmentId& attachment_id) {
236   DCHECK(CalledOnValidThread());
237   switch (result) {
238     case AttachmentUploader::UPLOAD_SUCCESS:
239       upload_task_queue_->MarkAsSucceeded(attachment_id);
240       if (delegate_) {
241         delegate_->OnAttachmentUploaded(attachment_id);
242       }
243       break;
244     case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
245       upload_task_queue_->MarkAsFailed(attachment_id);
246       upload_task_queue_->AddToQueue(attachment_id);
247       break;
248     case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
249       // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
250       upload_task_queue_->MarkAsFailed(attachment_id);
251       break;
252   }
253 }
254
255 void AttachmentServiceImpl::DownloadDone(
256     const scoped_refptr<GetOrDownloadState>& state,
257     const AttachmentId& attachment_id,
258     const AttachmentDownloader::DownloadResult& result,
259     scoped_ptr<Attachment> attachment) {
260   switch (result) {
261     case AttachmentDownloader::DOWNLOAD_SUCCESS:
262       state->AddAttachment(*attachment.get());
263       break;
264     case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
265     case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
266       state->AddUnavailableAttachmentId(attachment_id);
267       break;
268   }
269 }
270
271 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
272   DCHECK(CalledOnValidThread());
273   AttachmentIdList attachment_ids;
274   attachment_ids.push_back(attachment_id);
275   attachment_store_->Read(attachment_ids,
276                           base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
277                                      weak_ptr_factory_.GetWeakPtr()));
278 }
279
280 void AttachmentServiceImpl::UploadAttachments(
281     const AttachmentIdSet& attachment_ids) {
282   DCHECK(CalledOnValidThread());
283   if (!attachment_uploader_.get()) {
284     return;
285   }
286   AttachmentIdSet::const_iterator iter = attachment_ids.begin();
287   AttachmentIdSet::const_iterator end = attachment_ids.end();
288   for (; iter != end; ++iter) {
289     upload_task_queue_->AddToQueue(*iter);
290   }
291 }
292
293 void AttachmentServiceImpl::OnNetworkChanged(
294     net::NetworkChangeNotifier::ConnectionType type) {
295   if (type != net::NetworkChangeNotifier::CONNECTION_NONE) {
296     upload_task_queue_->ResetBackoff();
297   }
298 }
299
300 void AttachmentServiceImpl::ReadDoneNowUpload(
301     const AttachmentStore::Result& result,
302     scoped_ptr<AttachmentMap> attachments,
303     scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
304   DCHECK(CalledOnValidThread());
305   if (!unavailable_attachment_ids->empty()) {
306     // TODO(maniscalco): We failed to read some attachments. What should we do
307     // now?
308     AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
309     AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
310     for (; iter != end; ++iter) {
311       upload_task_queue_->Cancel(*iter);
312     }
313   }
314
315   AttachmentMap::const_iterator iter = attachments->begin();
316   AttachmentMap::const_iterator end = attachments->end();
317   for (; iter != end; ++iter) {
318     attachment_uploader_->UploadAttachment(
319         iter->second,
320         base::Bind(&AttachmentServiceImpl::UploadDone,
321                    weak_ptr_factory_.GetWeakPtr()));
322   }
323 }
324
325 void AttachmentServiceImpl::SetTimerForTest(scoped_ptr<base::Timer> timer) {
326   upload_task_queue_->SetTimerForTest(timer.Pass());
327 }
328
329 }  // namespace syncer