"io"
"net/http"
"os"
+ "sync"
. "git.tizen.org/tools/weles"
)
type Downloader struct {
notification chan ArtifactStatusChange // can be used to monitor ArtifactStatusChanges.
queue chan downloadJob
+ workers int
+ wg sync.WaitGroup
}
// downloadJob provides necessary info for download to be done.
type downloadJob struct {
path ArtifactPath
uri ArtifactURI
+ ch chan ArtifactStatusChange
}
// queueCap is the default length of download queue.
// newDownloader returns initilized Downloader.
func newDownloader(notification chan ArtifactStatusChange, workerCount int, queue int) *Downloader {
- return &Downloader{
+ d := &Downloader{
notification: notification,
queue: make(chan downloadJob, queue),
+ workers: workerCount,
}
+
+ // Start all workers.
+ d.wg.Add(d.workers)
+ for i := 0; i < d.workers; i++ {
+ go d.work()
+ }
+ return d
}
// NewDownloader returns Downloader initialized with default queue length
}
// Close is part of implementation of ArtifactDownloader interface.
-// It closes used channels.
+// It waits for running download jobs to stop and closes used channels.
func (d *Downloader) Close() {
- close(d.notification)
close(d.queue)
+ d.wg.Wait()
+ close(d.notification)
}
// getData downloads file from provided location and saves it in a prepared path.
func (d *Downloader) getData(URI ArtifactURI, path ArtifactPath) error {
-
resp, err := http.Get(string(URI))
if err != nil {
return err
defer file.Close()
_, err = io.Copy(file, resp.Body)
+
return err
}
} else {
change.NewStatus = AM_READY
}
-
notify(change, channels)
}
// Download is part of implementation of ArtifactDownloader interface.
-// TODO implement.
+// It puts new downloadJob on the queue.
func (d *Downloader) Download(URI ArtifactURI, path ArtifactPath, ch chan ArtifactStatusChange) error {
- return ErrNotImplemented
+ channels := []chan ArtifactStatusChange{ch, d.notification}
+ notify(ArtifactStatusChange{path, AM_PENDING}, channels)
+
+ job := downloadJob{
+ path: path,
+ uri: URI,
+ ch: ch,
+ }
+ select {
+ case d.queue <- job:
+ default:
+ return ErrQueueFull
+ }
+ return nil
+}
+
+func (d *Downloader) work() {
+ defer d.wg.Done()
+ for job := range d.queue {
+ d.download(job.uri, job.path, job.ch)
+ }
}
// CheckInCache is part of implementation of ArtifactDownloader interface.
Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
)
+
+ DescribeTable("Download(): Notify ch channel about any changes",
+ func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
+ ts := prepareServer(url)
+ defer ts.Close()
+
+ ch := make(chan weles.ArtifactStatusChange, 5)
+ dir := validDir
+ if !valid {
+ dir = invalidDir
+ }
+ path := weles.ArtifactPath(filepath.Join(dir, "animal"))
+
+ err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+ Expect(err).ToNot(HaveOccurred())
+
+ status := weles.ArtifactStatusChange{path, weles.AM_PENDING}
+ Eventually(ch).Should(Receive(Equal(status)))
+
+ status.NewStatus = weles.AM_DOWNLOADING
+ Eventually(ch).Should(Receive(Equal(status)))
+
+ status.NewStatus = finalResult
+ Eventually(ch).Should(Receive(Equal(status)))
+ },
+ Entry("download valid file to valid path", validURL, true, weles.AM_READY),
+ Entry("fail when url is invalid", invalidURL, true, weles.AM_FAILED),
+ Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
+ Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
+ )
+
+ DescribeTable("Download(): Download files to specified path.",
+ func(url weles.ArtifactURI, filename string, poem string) {
+ ts := prepareServer(url)
+ defer ts.Close()
+
+ ch := make(chan weles.ArtifactStatusChange, 5)
+ path := weles.ArtifactPath(filepath.Join(validDir, filename))
+
+ err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+ Expect(err).ToNot(HaveOccurred())
+
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_PENDING})))
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_DOWNLOADING})))
+
+ if poem != "" {
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_READY})))
+ content, err := ioutil.ReadFile(string(path))
+ Expect(err).ToNot(HaveOccurred())
+ Expect(string(content)).To(BeIdenticalTo(poem))
+ } else {
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_FAILED})))
+ content, err := ioutil.ReadFile(string(path))
+ Expect(err).To(HaveOccurred())
+ Expect(content).To(BeNil())
+
+ }
+ },
+ Entry("download valid file to valid path", validURL, "pigs", pigs),
+ Entry("fail when url is invalid", invalidURL, "cows", nil),
+ )
})
--- /dev/null
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// File errors.go provides definitions of downloader errors.
+
+package downloader
+
+import "errors"
+
+var (
+ //ErrQueueFull is returned when download queue is full.
+ ErrQueueFull = errors.New("downlad queue is full")
+)