"io"
"net/http"
"os"
+ "sync"
. "git.tizen.org/tools/weles"
)
type Downloader struct {
notification chan ArtifactStatusChange // can be used to monitor ArtifactStatusChanges.
queue chan downloadJob
+ wg sync.WaitGroup
}
// downloadJob provides necessary info for download to be done.
type downloadJob struct {
+ path ArtifactPath
+ uri ArtifactURI
+ ch chan ArtifactStatusChange
}
// queueCap is the default length of download queue.
const queueCap = 100
// newDownloader returns initilized Downloader.
-func newDownloader(notification chan ArtifactStatusChange, workerCount int, queueSize int) *Downloader {
+func newDownloader(notification chan ArtifactStatusChange, workers int, queueSize int) *Downloader {
- return &Downloader{
+ d := &Downloader{
notification: notification,
queue: make(chan downloadJob, queueSize),
}
+
+ // Start all workers.
+ d.wg.Add(workers)
+ for i := 0; i < workers; i++ {
+ go d.work()
+ }
+ return d
}
// NewDownloader returns Downloader initialized with default queue length
}
// Close is part of implementation of ArtifactDownloader interface.
-// It closes used channels.
+// It waits for running download jobs to stop and closes used channels.
func (d *Downloader) Close() {
close(d.queue)
+ d.wg.Wait()
}
// getData downloads file from provided location and saves it in a prepared path.
func (d *Downloader) getData(URI ArtifactURI, path ArtifactPath) error {
-
resp, err := http.Get(string(URI))
if err != nil {
return err
} else {
change.NewStatus = AM_READY
}
-
notify(change, channels)
}
// Download is part of implementation of ArtifactDownloader interface.
-// TODO implement.
+// It puts new downloadJob on the queue.
func (d *Downloader) Download(URI ArtifactURI, path ArtifactPath, ch chan ArtifactStatusChange) error {
- return ErrNotImplemented
+ channels := []chan ArtifactStatusChange{ch, d.notification}
+ notify(ArtifactStatusChange{path, AM_PENDING}, channels)
+ job := downloadJob{
+ path: path,
+ uri: URI,
+ ch: ch,
+ }
+
+ select {
+ case d.queue <- job:
+ default:
+ return ErrQueueFull
+ }
+ return nil
+}
+
+func (d *Downloader) work() {
+ defer d.wg.Done()
+ for job := range d.queue {
+ d.download(job.uri, job.path, job.ch)
+ }
}
// CheckInCache is part of implementation of ArtifactDownloader interface.
invalidDir string
validURL weles.ArtifactURI = "validURL"
invalidURL weles.ArtifactURI = "invalidURL"
+ ts *httptest.Server
+ ch chan weles.ArtifactStatusChange
)
+
var (
notifyCap int = 100 // notitication channel capacity.
notification chan weles.ArtifactStatusChange
Expect(err).ToNot(HaveOccurred())
// directory is not created therefore path will be invalid.
invalidDir = filepath.Join(tmpDir, "invalid")
+
+ ch = make(chan weles.ArtifactStatusChange, 5)
})
AfterEach(func() {
DescribeTable("getData(): Notify channels and save data to file",
func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
-
- ts := prepareServer(url)
+ ts = prepareServer(url)
defer ts.Close()
dir := validDir
DescribeTable("download(): Notify channels and save data to file",
func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
-
- ts := prepareServer(url)
+ ts = prepareServer(url)
defer ts.Close()
- ch := make(chan weles.ArtifactStatusChange, 5)
-
dir := validDir
if !valid {
dir = invalidDir
Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
)
+
+ DescribeTable("Download(): Notify ch channel about any changes",
+ func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
+ ts = prepareServer(url)
+ defer ts.Close()
+
+ dir := validDir
+ if !valid {
+ dir = invalidDir
+ }
+ path := weles.ArtifactPath(filepath.Join(dir, "animal"))
+
+ err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+ Expect(err).ToNot(HaveOccurred())
+
+ status := weles.ArtifactStatusChange{path, weles.AM_PENDING}
+ Eventually(ch).Should(Receive(Equal(status)))
+
+ status.NewStatus = weles.AM_DOWNLOADING
+ Eventually(ch).Should(Receive(Equal(status)))
+
+ status.NewStatus = finalResult
+ Eventually(ch).Should(Receive(Equal(status)))
+ },
+ Entry("download valid file to valid path", validURL, true, weles.AM_READY),
+ Entry("fail when url is invalid", invalidURL, true, weles.AM_FAILED),
+ Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
+ Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
+ )
+
+ DescribeTable("Download(): Download files to specified path.",
+ func(url weles.ArtifactURI, filename string, poem string) {
+ ts = prepareServer(url)
+ defer ts.Close()
+
+ path := weles.ArtifactPath(filepath.Join(validDir, filename))
+
+ err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+ Expect(err).ToNot(HaveOccurred())
+
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_PENDING})))
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_DOWNLOADING})))
+
+ if poem != "" {
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_READY})))
+ content, err := ioutil.ReadFile(string(path))
+ Expect(err).ToNot(HaveOccurred())
+ Expect(string(content)).To(BeIdenticalTo(poem))
+ } else {
+ Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_FAILED})))
+ content, err := ioutil.ReadFile(string(path))
+ Expect(err).To(HaveOccurred())
+ Expect(content).To(BeNil())
+
+ }
+ },
+ Entry("download valid file to valid path", validURL, "pigs", pigs),
+ Entry("fail when url is invalid", invalidURL, "cows", nil),
+ )
+
+ Describe("DownloadJob queue capacity", func() {
+ It("should return error if queue if full.", func() {
+ ts = prepareServer(validURL)
+
+ notification := make(chan weles.ArtifactStatusChange, notifyCap)
+ ironGopher := newDownloader(notification, 0, 0)
+ defer ironGopher.Close()
+
+ path := weles.ArtifactPath(filepath.Join(validDir, "file"))
+
+ err := ironGopher.Download(weles.ArtifactURI(ts.URL), path, ch)
+ Expect(err).To(Equal(ErrQueueFull))
+ })
+ })
})