Add downloadJob queue 51/162351/8
authorKatarzyna Gorska <k.gorska@samsung.com>
Tue, 28 Nov 2017 13:51:24 +0000 (14:51 +0100)
committerKatarzyna Gorska <k.gorska@samsung.com>
Fri, 15 Dec 2017 11:16:00 +0000 (12:16 +0100)
Change-Id: I90afb2433977be3ba5b72935d88b289e28e97b2e
Signed-off-by: Katarzyna Gorska <k.gorska@samsung.com>
artifacts/downloader/downloader.go
artifacts/downloader/downloader_test.go
artifacts/downloader/errors.go [new file with mode: 0644]

index 70a225d..0111010 100644 (file)
@@ -22,6 +22,7 @@ import (
        "io"
        "net/http"
        "os"
+       "sync"
 
        . "git.tizen.org/tools/weles"
 )
@@ -30,12 +31,15 @@ import (
 type Downloader struct {
        notification chan ArtifactStatusChange // can be used to monitor ArtifactStatusChanges.
        queue        chan downloadJob
+       workers      int
+       wg           sync.WaitGroup
 }
 
 // downloadJob provides necessary info for download to be done.
 type downloadJob struct {
        path ArtifactPath
        uri  ArtifactURI
+       ch   chan ArtifactStatusChange
 }
 
 // queueCap is the default length of download queue.
@@ -44,10 +48,18 @@ var queueCap = 100
 // newDownloader returns initilized Downloader.
 func newDownloader(notification chan ArtifactStatusChange, workerCount int, queue int) *Downloader {
 
-       return &Downloader{
+       d := &Downloader{
                notification: notification,
                queue:        make(chan downloadJob, queue),
+               workers:      workerCount,
        }
+
+       // Start all workers.
+       d.wg.Add(d.workers)
+       for i := 0; i < d.workers; i++ {
+               go d.work()
+       }
+       return d
 }
 
 // NewDownloader returns Downloader initialized  with default queue length
@@ -56,15 +68,15 @@ func NewDownloader(notification chan ArtifactStatusChange, workerCount int) *Dow
 }
 
 // Close is part of implementation of ArtifactDownloader interface.
-// It closes used channels.
+// It waits for running download jobs to stop and closes used channels.
 func (d *Downloader) Close() {
-       close(d.notification)
        close(d.queue)
+       d.wg.Wait()
+       close(d.notification)
 }
 
 // getData downloads file from provided location and saves it in a prepared path.
 func (d *Downloader) getData(URI ArtifactURI, path ArtifactPath) error {
-
        resp, err := http.Get(string(URI))
        if err != nil {
                return err
@@ -82,6 +94,7 @@ func (d *Downloader) getData(URI ArtifactURI, path ArtifactPath) error {
        defer file.Close()
 
        _, err = io.Copy(file, resp.Body)
+
        return err
 }
 
@@ -107,15 +120,34 @@ func (d *Downloader) download(URI ArtifactURI, path ArtifactPath, ch chan Artifa
        } else {
                change.NewStatus = AM_READY
        }
-
        notify(change, channels)
 }
 
 // Download is part of implementation of ArtifactDownloader interface.
-// TODO implement.
+// It puts new downloadJob on the queue.
 func (d *Downloader) Download(URI ArtifactURI, path ArtifactPath, ch chan ArtifactStatusChange) error {
-       return ErrNotImplemented
+       channels := []chan ArtifactStatusChange{ch, d.notification}
+       notify(ArtifactStatusChange{path, AM_PENDING}, channels)
+
+       job := downloadJob{
+               path: path,
+               uri:  URI,
+               ch:   ch,
+       }
 
+       select {
+       case d.queue <- job:
+       default:
+               return ErrQueueFull
+       }
+       return nil
+}
+
+func (d *Downloader) work() {
+       defer d.wg.Done()
+       for job := range d.queue {
+               d.download(job.uri, job.path, job.ch)
+       }
 }
 
 // CheckInCache is part of implementation of ArtifactDownloader interface.
index bd22a0d..9b879de 100644 (file)
@@ -157,4 +157,65 @@ var _ = Describe("Downloader", func() {
                Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
                Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
        )
+
+       DescribeTable("Download(): Notify ch channel about any changes",
+               func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
+                       ts := prepareServer(url)
+                       defer ts.Close()
+
+                       ch := make(chan weles.ArtifactStatusChange, 5)
+                       dir := validDir
+                       if !valid {
+                               dir = invalidDir
+                       }
+                       path := weles.ArtifactPath(filepath.Join(dir, "animal"))
+
+                       err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+                       Expect(err).ToNot(HaveOccurred())
+
+                       status := weles.ArtifactStatusChange{path, weles.AM_PENDING}
+                       Eventually(ch).Should(Receive(Equal(status)))
+
+                       status.NewStatus = weles.AM_DOWNLOADING
+                       Eventually(ch).Should(Receive(Equal(status)))
+
+                       status.NewStatus = finalResult
+                       Eventually(ch).Should(Receive(Equal(status)))
+               },
+               Entry("download valid file to valid path", validURL, true, weles.AM_READY),
+               Entry("fail when url is invalid", invalidURL, true, weles.AM_FAILED),
+               Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
+               Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
+       )
+
+       DescribeTable("Download(): Download files to specified path.",
+               func(url weles.ArtifactURI, filename string, poem string) {
+                       ts := prepareServer(url)
+                       defer ts.Close()
+
+                       ch := make(chan weles.ArtifactStatusChange, 5)
+                       path := weles.ArtifactPath(filepath.Join(validDir, filename))
+
+                       err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+                       Expect(err).ToNot(HaveOccurred())
+
+                       Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_PENDING})))
+                       Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_DOWNLOADING})))
+
+                       if poem != "" {
+                               Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_READY})))
+                               content, err := ioutil.ReadFile(string(path))
+                               Expect(err).ToNot(HaveOccurred())
+                               Expect(string(content)).To(BeIdenticalTo(poem))
+                       } else {
+                               Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_FAILED})))
+                               content, err := ioutil.ReadFile(string(path))
+                               Expect(err).To(HaveOccurred())
+                               Expect(content).To(BeNil())
+
+                       }
+               },
+               Entry("download valid file to valid path", validURL, "pigs", pigs),
+               Entry("fail when url is invalid", invalidURL, "cows", nil),
+       )
 })
diff --git a/artifacts/downloader/errors.go b/artifacts/downloader/errors.go
new file mode 100644 (file)
index 0000000..359da59
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+// File errors.go provides definitions of downloader errors.
+
+package downloader
+
+import "errors"
+
+var (
+       //ErrQueueFull is returned when download queue is full.
+       ErrQueueFull = errors.New("downlad queue is full")
+)