1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "base/time/time.h"
13 #include "sync/api/attachments/attachment.h"
14 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
15 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
19 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
20 // GetOrDownloadState tracks completion of these calls and posts callback for
21 // consumer once all attachments are either retrieved or reported unavailable.
22 class AttachmentServiceImpl::GetOrDownloadState
23 : public base::RefCounted<GetOrDownloadState>,
24 public base::NonThreadSafe {
26 // GetOrDownloadState gets parameter from values passed to
27 // AttachmentService::GetOrDownloadAttachments.
28 // |attachment_ids| is a list of attachmens to retrieve.
29 // |callback| will be posted on current thread when all attachments retrieved
30 // or confirmed unavailable.
31 GetOrDownloadState(const AttachmentIdList& attachment_ids,
32 const GetOrDownloadCallback& callback);
34 // Attachment was just retrieved. Add it to retrieved attachments.
35 void AddAttachment(const Attachment& attachment);
37 // Both reading from local store and downloading attachment failed.
38 // Add it to unavailable set.
39 void AddUnavailableAttachmentId(const AttachmentId& attachment_id);
42 friend class base::RefCounted<GetOrDownloadState>;
43 virtual ~GetOrDownloadState();
45 // If all attachment requests completed then post callback to consumer with
47 void PostResultIfAllRequestsCompleted();
49 GetOrDownloadCallback callback_;
51 // Requests for these attachments are still in progress.
52 AttachmentIdSet in_progress_attachments_;
54 AttachmentIdSet unavailable_attachments_;
55 scoped_ptr<AttachmentMap> retrieved_attachments_;
57 DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState);
60 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
61 const AttachmentIdList& attachment_ids,
62 const GetOrDownloadCallback& callback)
63 : callback_(callback), retrieved_attachments_(new AttachmentMap()) {
65 attachment_ids.begin(),
67 std::inserter(in_progress_attachments_, in_progress_attachments_.end()));
68 PostResultIfAllRequestsCompleted();
71 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
72 DCHECK(CalledOnValidThread());
75 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
76 const Attachment& attachment) {
77 DCHECK(CalledOnValidThread());
78 DCHECK(retrieved_attachments_->find(attachment.GetId()) ==
79 retrieved_attachments_->end());
80 retrieved_attachments_->insert(
81 std::make_pair(attachment.GetId(), attachment));
82 DCHECK(in_progress_attachments_.find(attachment.GetId()) !=
83 in_progress_attachments_.end());
84 in_progress_attachments_.erase(attachment.GetId());
85 PostResultIfAllRequestsCompleted();
88 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
89 const AttachmentId& attachment_id) {
90 DCHECK(CalledOnValidThread());
91 DCHECK(unavailable_attachments_.find(attachment_id) ==
92 unavailable_attachments_.end());
93 unavailable_attachments_.insert(attachment_id);
94 DCHECK(in_progress_attachments_.find(attachment_id) !=
95 in_progress_attachments_.end());
96 in_progress_attachments_.erase(attachment_id);
97 PostResultIfAllRequestsCompleted();
101 AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() {
102 if (in_progress_attachments_.empty()) {
103 // All requests completed. Let's notify consumer.
104 GetOrDownloadResult result =
105 unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR;
106 base::MessageLoop::current()->PostTask(
108 base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
112 AttachmentServiceImpl::AttachmentServiceImpl(
113 scoped_refptr<AttachmentStore> attachment_store,
114 scoped_ptr<AttachmentUploader> attachment_uploader,
115 scoped_ptr<AttachmentDownloader> attachment_downloader,
117 const base::TimeDelta& initial_backoff_delay,
118 const base::TimeDelta& max_backoff_delay)
119 : attachment_store_(attachment_store),
120 attachment_uploader_(attachment_uploader.Pass()),
121 attachment_downloader_(attachment_downloader.Pass()),
123 weak_ptr_factory_(this) {
124 DCHECK(CalledOnValidThread());
125 DCHECK(attachment_store_.get());
127 // TODO(maniscalco): Observe network connectivity change events. When the
128 // network becomes disconnected, consider suspending queue dispatch. When
129 // connectivity is restored, consider clearing any dispatch backoff (bug
131 upload_task_queue_.reset(new TaskQueue<AttachmentId>(
132 base::Bind(&AttachmentServiceImpl::BeginUpload,
133 weak_ptr_factory_.GetWeakPtr()),
134 initial_backoff_delay,
137 net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
140 AttachmentServiceImpl::~AttachmentServiceImpl() {
141 DCHECK(CalledOnValidThread());
142 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
146 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
147 scoped_refptr<syncer::AttachmentStore> attachment_store =
148 AttachmentStore::CreateInMemoryStore();
149 scoped_ptr<AttachmentUploader> attachment_uploader(
150 new FakeAttachmentUploader);
151 scoped_ptr<AttachmentDownloader> attachment_downloader(
152 new FakeAttachmentDownloader());
153 scoped_ptr<syncer::AttachmentService> attachment_service(
154 new syncer::AttachmentServiceImpl(attachment_store,
155 attachment_uploader.Pass(),
156 attachment_downloader.Pass(),
160 return attachment_service.Pass();
163 AttachmentStore* AttachmentServiceImpl::GetStore() {
164 return attachment_store_.get();
167 void AttachmentServiceImpl::GetOrDownloadAttachments(
168 const AttachmentIdList& attachment_ids,
169 const GetOrDownloadCallback& callback) {
170 DCHECK(CalledOnValidThread());
171 scoped_refptr<GetOrDownloadState> state(
172 new GetOrDownloadState(attachment_ids, callback));
173 attachment_store_->Read(attachment_ids,
174 base::Bind(&AttachmentServiceImpl::ReadDone,
175 weak_ptr_factory_.GetWeakPtr(),
179 void AttachmentServiceImpl::DropAttachments(
180 const AttachmentIdList& attachment_ids,
181 const DropCallback& callback) {
182 DCHECK(CalledOnValidThread());
183 attachment_store_->Drop(attachment_ids,
184 base::Bind(&AttachmentServiceImpl::DropDone,
185 weak_ptr_factory_.GetWeakPtr(),
189 void AttachmentServiceImpl::ReadDone(
190 const scoped_refptr<GetOrDownloadState>& state,
191 const AttachmentStore::Result& result,
192 scoped_ptr<AttachmentMap> attachments,
193 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
194 // Add read attachments to result.
195 for (AttachmentMap::const_iterator iter = attachments->begin();
196 iter != attachments->end();
198 state->AddAttachment(iter->second);
201 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
202 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
203 if (attachment_downloader_.get()) {
204 // Try to download locally unavailable attachments.
205 for (; iter != end; ++iter) {
206 attachment_downloader_->DownloadAttachment(
208 base::Bind(&AttachmentServiceImpl::DownloadDone,
209 weak_ptr_factory_.GetWeakPtr(),
214 // No downloader so all locally unavailable attachments are unavailable.
215 for (; iter != end; ++iter) {
216 state->AddUnavailableAttachmentId(*iter);
221 void AttachmentServiceImpl::DropDone(const DropCallback& callback,
222 const AttachmentStore::Result& result) {
223 AttachmentService::DropResult drop_result =
224 AttachmentService::DROP_UNSPECIFIED_ERROR;
225 if (result == AttachmentStore::SUCCESS) {
226 drop_result = AttachmentService::DROP_SUCCESS;
228 // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
229 base::MessageLoop::current()->PostTask(FROM_HERE,
230 base::Bind(callback, drop_result));
233 void AttachmentServiceImpl::UploadDone(
234 const AttachmentUploader::UploadResult& result,
235 const AttachmentId& attachment_id) {
236 DCHECK(CalledOnValidThread());
238 case AttachmentUploader::UPLOAD_SUCCESS:
239 upload_task_queue_->MarkAsSucceeded(attachment_id);
241 delegate_->OnAttachmentUploaded(attachment_id);
244 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
245 upload_task_queue_->MarkAsFailed(attachment_id);
246 upload_task_queue_->AddToQueue(attachment_id);
248 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
249 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
250 upload_task_queue_->MarkAsFailed(attachment_id);
255 void AttachmentServiceImpl::DownloadDone(
256 const scoped_refptr<GetOrDownloadState>& state,
257 const AttachmentId& attachment_id,
258 const AttachmentDownloader::DownloadResult& result,
259 scoped_ptr<Attachment> attachment) {
261 case AttachmentDownloader::DOWNLOAD_SUCCESS:
262 state->AddAttachment(*attachment.get());
264 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
265 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
266 state->AddUnavailableAttachmentId(attachment_id);
271 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
272 DCHECK(CalledOnValidThread());
273 AttachmentIdList attachment_ids;
274 attachment_ids.push_back(attachment_id);
275 attachment_store_->Read(attachment_ids,
276 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
277 weak_ptr_factory_.GetWeakPtr()));
280 void AttachmentServiceImpl::UploadAttachments(
281 const AttachmentIdSet& attachment_ids) {
282 DCHECK(CalledOnValidThread());
283 if (!attachment_uploader_.get()) {
286 AttachmentIdSet::const_iterator iter = attachment_ids.begin();
287 AttachmentIdSet::const_iterator end = attachment_ids.end();
288 for (; iter != end; ++iter) {
289 upload_task_queue_->AddToQueue(*iter);
293 void AttachmentServiceImpl::OnNetworkChanged(
294 net::NetworkChangeNotifier::ConnectionType type) {
295 if (type != net::NetworkChangeNotifier::CONNECTION_NONE) {
296 upload_task_queue_->ResetBackoff();
300 void AttachmentServiceImpl::ReadDoneNowUpload(
301 const AttachmentStore::Result& result,
302 scoped_ptr<AttachmentMap> attachments,
303 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
304 DCHECK(CalledOnValidThread());
305 if (!unavailable_attachment_ids->empty()) {
306 // TODO(maniscalco): We failed to read some attachments. What should we do
308 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
309 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
310 for (; iter != end; ++iter) {
311 upload_task_queue_->Cancel(*iter);
315 AttachmentMap::const_iterator iter = attachments->begin();
316 AttachmentMap::const_iterator end = attachments->end();
317 for (; iter != end; ++iter) {
318 attachment_uploader_->UploadAttachment(
320 base::Bind(&AttachmentServiceImpl::UploadDone,
321 weak_ptr_factory_.GetWeakPtr()));
325 void AttachmentServiceImpl::SetTimerForTest(scoped_ptr<base::Timer> timer) {
326 upload_task_queue_->SetTimerForTest(timer.Pass());
329 } // namespace syncer