Add downloadJob queue 51/162351/12
authorKatarzyna Gorska <k.gorska@samsung.com>
Tue, 28 Nov 2017 13:51:24 +0000 (14:51 +0100)
committerk.gorska <k.gorska@samsung.com>
Wed, 24 Jan 2018 14:16:28 +0000 (15:16 +0100)
Change-Id: I90afb2433977be3ba5b72935d88b289e28e97b2e
Signed-off-by: Katarzyna Gorska <k.gorska@samsung.com>
artifacts/downloader/downloader.go
artifacts/downloader/downloader_test.go
artifacts/downloader/errors.go [new file with mode: 0644]

index 0dbb5fb..2edf024 100644 (file)
@@ -22,6 +22,7 @@ import (
        "io"
        "net/http"
        "os"
+       "sync"
 
        . "git.tizen.org/tools/weles"
 )
@@ -30,22 +31,33 @@ import (
 type Downloader struct {
        notification chan ArtifactStatusChange // can be used to monitor ArtifactStatusChanges.
        queue        chan downloadJob
+       wg           sync.WaitGroup
 }
 
 // downloadJob provides necessary info for download to be done.
 type downloadJob struct {
+       path ArtifactPath
+       uri  ArtifactURI
+       ch   chan ArtifactStatusChange
 }
 
 // queueCap is the default length of download queue.
 const queueCap = 100
 
 // newDownloader returns initilized Downloader.
-func newDownloader(notification chan ArtifactStatusChange, workerCount int, queueSize int) *Downloader {
+func newDownloader(notification chan ArtifactStatusChange, workers int, queueSize int) *Downloader {
 
-       return &Downloader{
+       d := &Downloader{
                notification: notification,
                queue:        make(chan downloadJob, queueSize),
        }
+
+       // Start all workers.
+       d.wg.Add(workers)
+       for i := 0; i < workers; i++ {
+               go d.work()
+       }
+       return d
 }
 
 // NewDownloader returns Downloader initialized  with default queue length
@@ -54,14 +66,14 @@ func NewDownloader(notification chan ArtifactStatusChange, workerCount int) *Dow
 }
 
 // Close is part of implementation of ArtifactDownloader interface.
-// It closes used channels.
+// It waits for running download jobs to stop and closes used channels.
 func (d *Downloader) Close() {
        close(d.queue)
+       d.wg.Wait()
 }
 
 // getData downloads file from provided location and saves it in a prepared path.
 func (d *Downloader) getData(URI ArtifactURI, path ArtifactPath) error {
-
        resp, err := http.Get(string(URI))
        if err != nil {
                return err
@@ -104,15 +116,34 @@ func (d *Downloader) download(URI ArtifactURI, path ArtifactPath, ch chan Artifa
        } else {
                change.NewStatus = AM_READY
        }
-
        notify(change, channels)
 }
 
 // Download is part of implementation of ArtifactDownloader interface.
-// TODO implement.
+// It puts new downloadJob on the queue.
 func (d *Downloader) Download(URI ArtifactURI, path ArtifactPath, ch chan ArtifactStatusChange) error {
-       return ErrNotImplemented
+       channels := []chan ArtifactStatusChange{ch, d.notification}
+       notify(ArtifactStatusChange{path, AM_PENDING}, channels)
 
+       job := downloadJob{
+               path: path,
+               uri:  URI,
+               ch:   ch,
+       }
+
+       select {
+       case d.queue <- job:
+       default:
+               return ErrQueueFull
+       }
+       return nil
+}
+
+func (d *Downloader) work() {
+       defer d.wg.Done()
+       for job := range d.queue {
+               d.download(job.uri, job.path, job.ch)
+       }
 }
 
 // CheckInCache is part of implementation of ArtifactDownloader interface.
index 4956243..c6537b3 100644 (file)
@@ -47,7 +47,10 @@ I call it stupid of the pig.
                invalidDir string
                validURL   weles.ArtifactURI = "validURL"
                invalidURL weles.ArtifactURI = "invalidURL"
+               ts         *httptest.Server
+               ch         chan weles.ArtifactStatusChange
        )
+
        var (
                notifyCap    int = 100 // notitication channel capacity.
                notification chan weles.ArtifactStatusChange
@@ -74,6 +77,8 @@ I call it stupid of the pig.
                Expect(err).ToNot(HaveOccurred())
                // directory is not created therefore path will be invalid.
                invalidDir = filepath.Join(tmpDir, "invalid")
+
+               ch = make(chan weles.ArtifactStatusChange, 5)
        })
 
        AfterEach(func() {
@@ -97,8 +102,7 @@ I call it stupid of the pig.
 
        DescribeTable("getData(): Notify channels and save data to file",
                func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
-
-                       ts := prepareServer(url)
+                       ts = prepareServer(url)
                        defer ts.Close()
 
                        dir := validDir
@@ -129,12 +133,9 @@ I call it stupid of the pig.
 
        DescribeTable("download(): Notify channels and save data to file",
                func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
-
-                       ts := prepareServer(url)
+                       ts = prepareServer(url)
                        defer ts.Close()
 
-                       ch := make(chan weles.ArtifactStatusChange, 5)
-
                        dir := validDir
                        if !valid {
                                dir = invalidDir
@@ -165,4 +166,78 @@ I call it stupid of the pig.
                Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
                Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
        )
+
+       DescribeTable("Download(): Notify ch channel about any changes",
+               func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) {
+                       ts = prepareServer(url)
+                       defer ts.Close()
+
+                       dir := validDir
+                       if !valid {
+                               dir = invalidDir
+                       }
+                       path := weles.ArtifactPath(filepath.Join(dir, "animal"))
+
+                       err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+                       Expect(err).ToNot(HaveOccurred())
+
+                       status := weles.ArtifactStatusChange{path, weles.AM_PENDING}
+                       Eventually(ch).Should(Receive(Equal(status)))
+
+                       status.NewStatus = weles.AM_DOWNLOADING
+                       Eventually(ch).Should(Receive(Equal(status)))
+
+                       status.NewStatus = finalResult
+                       Eventually(ch).Should(Receive(Equal(status)))
+               },
+               Entry("download valid file to valid path", validURL, true, weles.AM_READY),
+               Entry("fail when url is invalid", invalidURL, true, weles.AM_FAILED),
+               Entry("fail when path is invalid", validURL, false, weles.AM_FAILED),
+               Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED),
+       )
+
+       DescribeTable("Download(): Download files to specified path.",
+               func(url weles.ArtifactURI, filename string, poem string) {
+                       ts = prepareServer(url)
+                       defer ts.Close()
+
+                       path := weles.ArtifactPath(filepath.Join(validDir, filename))
+
+                       err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch)
+                       Expect(err).ToNot(HaveOccurred())
+
+                       Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_PENDING})))
+                       Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_DOWNLOADING})))
+
+                       if poem != "" {
+                               Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_READY})))
+                               content, err := ioutil.ReadFile(string(path))
+                               Expect(err).ToNot(HaveOccurred())
+                               Expect(string(content)).To(BeIdenticalTo(poem))
+                       } else {
+                               Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_FAILED})))
+                               content, err := ioutil.ReadFile(string(path))
+                               Expect(err).To(HaveOccurred())
+                               Expect(content).To(BeNil())
+
+                       }
+               },
+               Entry("download valid file to valid path", validURL, "pigs", pigs),
+               Entry("fail when url is invalid", invalidURL, "cows", nil),
+       )
+
+       Describe("DownloadJob queue capacity", func() {
+               It("should return error if queue if full.", func() {
+                       ts = prepareServer(validURL)
+
+                       notification := make(chan weles.ArtifactStatusChange, notifyCap)
+                       ironGopher := newDownloader(notification, 0, 0)
+                       defer ironGopher.Close()
+
+                       path := weles.ArtifactPath(filepath.Join(validDir, "file"))
+
+                       err := ironGopher.Download(weles.ArtifactURI(ts.URL), path, ch)
+                       Expect(err).To(Equal(ErrQueueFull))
+               })
+       })
 })
diff --git a/artifacts/downloader/errors.go b/artifacts/downloader/errors.go
new file mode 100644 (file)
index 0000000..359da59
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+// File errors.go provides definitions of downloader errors.
+
+package downloader
+
+import "errors"
+
+var (
+       //ErrQueueFull is returned when download queue is full.
+       ErrQueueFull = errors.New("downlad queue is full")
+)