Add Downloader implementation with tests 15/162015/4
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Mon, 20 Nov 2017 05:04:31 +0000 (06:04 +0100)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Mon, 4 Dec 2017 10:01:24 +0000 (11:01 +0100)
DownloaderImpl is responsible for parsing Job's config and searching
for information about files (artefacts) related to the test.
It delegates to ArtifactsManager downloading images and testfiles
required to run Job (Push). It creates also paths for output
artifacts (Pull). It updates Job's config with proper paths
for these artefacts.

It does also monitor process of downloading files in ArtifactManager
and finally it notifies Controller when all files are downloaded
or in case of failure.

Change-Id: I578108dd1a924aaf36435e4dbc5d8dcce50214ee
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
controller/downloaderimpl.go [new file with mode: 0644]
controller/downloaderimpl_test.go [new file with mode: 0644]

diff --git a/controller/downloaderimpl.go b/controller/downloaderimpl.go
new file mode 100644 (file)
index 0000000..e4d6f92
--- /dev/null
@@ -0,0 +1,327 @@
+/*
+ *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+// File controller/downloaderimpl.go implements Downloader.
+
+package controller
+
+import (
+       "fmt"
+       "sync"
+
+       . "git.tizen.org/tools/weles"
+       "git.tizen.org/tools/weles/controller/notifier"
+)
+
+// jobArtifactsInfo contains information about progress of downloading
+// artifacts required by a single Job.
+type jobArtifactsInfo struct {
+       paths       int
+       ready       int
+       failed      int
+       configSaved bool
+}
+
+// DownloaderImpl implements delegating downloading of artifacts required
+// to run Jobs to ArtifactsManager, monitors progress and notifies
+// Controller, when all files are ready.
+type DownloaderImpl struct {
+       // Notifier provides channel for communication with Controller.
+       notifier.Notifier
+       // jobs references module implementing Jobs management.
+       jobs JobsController
+       // artifacts references Weles module implementing ArtifactManager for
+       // managing ArtifactsDB.
+       artifacts ArtifactManager
+       // collector gathers artifact status changes from ArtifactManager
+       collector chan ArtifactStatusChange
+
+       // path2Job identifies Job related to the artifact path.
+       path2Job map[string]JobID
+       // info contains information about progress of downloading artifacts
+       // for Jobs.
+       info map[JobID]jobArtifactsInfo
+       //mutex protects access to path2Job and info maps.
+       mutex *sync.Mutex
+}
+
+// NewDownloader creates a new DownloaderImpl structure setting up references
+// to used Weles modules.
+func NewDownloader(j JobsController, a ArtifactManager) Downloader {
+       ret := &DownloaderImpl{
+               Notifier:  notifier.NewNotifier(),
+               jobs:      j,
+               artifacts: a,
+               collector: make(chan ArtifactStatusChange),
+               path2Job:  make(map[string]JobID),
+               info:      make(map[JobID]jobArtifactsInfo),
+               mutex:     new(sync.Mutex),
+       }
+       go ret.loop()
+       return ret
+}
+
+// pathStatusChange reacts on notification from ArtifactManager and updates
+// path and job structures.
+func (h *DownloaderImpl) pathStatusChange(path string, status ArtifactStatus) (changed bool, j JobID, info string) {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+       j, ok := h.path2Job[path]
+       if !ok {
+               return
+       }
+       i, ok := h.info[j]
+       if !ok {
+               delete(h.path2Job, path)
+               return
+       }
+       switch status {
+       case AM_READY:
+               i.ready++
+               info = fmt.Sprintf("%d / %d artifacts ready", i.ready, i.paths)
+       case AM_FAILED:
+               i.failed++
+               info = "Failed to download artifact"
+       default:
+               return
+       }
+       h.info[j] = i
+       delete(h.path2Job, path)
+       changed = true
+       return
+}
+
+// removePath removes mapping from the path to related Job.
+func (h *DownloaderImpl) removePath(path string) {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+       delete(h.path2Job, path)
+}
+
+// loop handles all notifications from ArtficatManager, updates
+// jobArtficatsInfo and send notification to Controller, when the answer
+// is ready.
+//
+// It is run in a separate goroutine - one for all jobs.
+func (h *DownloaderImpl) loop() {
+       for {
+               change := <-h.collector
+               update, j, info := h.pathStatusChange(string(change.Path), change.NewStatus)
+               if !update {
+                       continue
+               }
+
+               err := h.jobs.SetStatusAndInfo(j, JOB_DOWNLOADING, info)
+               if err != nil {
+                       h.removePath(string(change.Path))
+                       h.fail(j, fmt.Sprintf("Internal Weles error while changing Job status : %s", err.Error()))
+               }
+               h.sendIfReady(j)
+       }
+}
+
+// fail responses failure to Controller.
+func (h *DownloaderImpl) fail(j JobID, msg string) {
+       if h.removeJobInfo(j) == nil {
+               h.SendFail(j, msg)
+       }
+}
+
+// succeed responses success to Controller.
+func (h *DownloaderImpl) succeed(j JobID) {
+       if h.removeJobInfo(j) == nil {
+               h.SendOK(j)
+       }
+}
+
+// addJobInfo creates a jobArtifactInfo structure.
+func (h *DownloaderImpl) addJobInfo(j JobID) {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+       i, ok := h.info[j]
+       if !ok {
+               i = jobArtifactsInfo{}
+       }
+       h.info[j] = i
+}
+
+// removeJobInfo removes a jobArtifactInfo structure.
+func (h *DownloaderImpl) removeJobInfo(j JobID) error {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+
+       _, ok := h.info[j]
+       if !ok {
+               return ErrJobNotFound
+       }
+       delete(h.info, j)
+       return nil
+}
+
+// push delegates downloading single uri to ArtifactDB.
+func (h *DownloaderImpl) push(j JobID, t ArtifactType, alias string, uri string) (string, error) {
+       p, err := h.artifacts.PushArtifact(ArtifactDescription{
+               JobID: j,
+               Type:  t,
+               Alias: ArtifactAlias(alias),
+               URI:   ArtifactURI(uri),
+       }, h.collector)
+       if err != nil {
+               return "", err
+       }
+
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+
+       i, ok := h.info[j]
+       if !ok {
+               return "", ErrJobNotFound
+       }
+       i.paths++
+       h.info[j] = i
+       h.path2Job[string(p)] = j
+
+       return string(p), nil
+}
+
+// pullCreate creates a new path for pull artifact.
+func (h *DownloaderImpl) pullCreate(j JobID, alias string) (string, error) {
+       p, err := h.artifacts.CreateArtifact(ArtifactDescription{
+               JobID: j,
+               Type:  AM_TESTFILE,
+               Alias: ArtifactAlias(alias),
+       })
+       return string(p), err
+}
+
+// configSaved updates info structure.
+func (h *DownloaderImpl) configSaved(j JobID) {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+
+       i, ok := h.info[j]
+       if !ok {
+               return
+       }
+
+       i.configSaved = true
+       h.info[j] = i
+}
+
+// verify if an answer to the Controller should be send.
+func (h *DownloaderImpl) verify(j JobID) (ok, fail bool) {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+
+       i, ok := h.info[j]
+       if !ok { // Job is not monitored (maybe it has been responded already).
+               return false, false
+       }
+       if i.failed > 0 { // Some artifacts fail to be downloaded.
+               return false, true
+       }
+       if !i.configSaved { // Config is not yet fully analyzed and saved.
+               return false, false
+       }
+       if i.ready == i.paths { // All artifacts are ready.
+               return true, false
+       }
+       return false, false
+}
+
+// sendIfReady sends an answer to the Controller if it is ready.
+func (h *DownloaderImpl) sendIfReady(j JobID) {
+       ok, failed := h.verify(j)
+
+       if ok {
+               h.succeed(j)
+               return
+       }
+       if failed {
+               h.fail(j, "Failed to download all artifacts for the Job")
+       }
+}
+
+// Download parses Job's config and delegates to ArtifactManager downloading
+// of all images and files to bu pushed during Job execution. It also creates
+// ArtifactDB paths for files that will be pulled from Dryad.
+func (h *DownloaderImpl) Download(j JobID) {
+       h.addJobInfo(j)
+
+       err := h.jobs.SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+       if err != nil {
+               h.fail(j, fmt.Sprintf("Internal Weles error while changing Job status : %s", err.Error()))
+               return
+       }
+
+       config, err := h.jobs.GetConfig(j)
+       if err != nil {
+               h.fail(j, fmt.Sprintf("Internal Weles error while getting Job config : %s", err.Error()))
+               return
+       }
+
+       for i, image := range config.Action.Deploy.Images {
+               if image.Uri != "" {
+                       path, err := h.push(j, AM_IMAGEFILE, fmt.Sprintf("Image_%d", i), image.Uri)
+                       if err != nil {
+                               h.fail(j, fmt.Sprintf("Internal Weles error while registering URI:<%s> in ArtifactManager : %s", image.Uri, err.Error()))
+                               return
+                       }
+                       config.Action.Deploy.Images[i].Path = path
+               }
+               if image.Md5Uri != "" {
+                       path, err := h.push(j, AM_IMAGEFILE, fmt.Sprintf("ImageMD5_%d", i), image.Md5Uri)
+                       if err != nil {
+                               h.fail(j, fmt.Sprintf("Internal Weles error while registering URI:<%s> in ArtifactManager : %s", image.Md5Uri, err.Error()))
+                               return
+                       }
+                       config.Action.Deploy.Images[i].MD5Path = path
+               }
+       }
+       for i, tc := range config.Action.Test.TestCases {
+               for k, ta := range tc.TestActions {
+                       switch ta.(type) {
+                       case Push:
+                               action := ta.(Push)
+                               path, err := h.push(j, AM_TESTFILE, action.Alias, action.Uri)
+                               if err != nil {
+                                       h.fail(j, fmt.Sprintf("Internal Weles error while registering URI:<%s> in ArtifactManager : %s", action.Uri, err.Error()))
+                                       return
+                               }
+                               action.Path = path
+                               config.Action.Test.TestCases[i].TestActions[k] = action
+                       case Pull:
+                               action := ta.(Pull)
+                               path, err := h.pullCreate(j, action.Alias)
+                               if err != nil {
+                                       h.fail(j, fmt.Sprintf("Internal Weles error while creating a new path in ArtifactManager : %s", err.Error()))
+                                       return
+                               }
+                               action.Path = path
+                               config.Action.Test.TestCases[i].TestActions[k] = action
+                       }
+               }
+       }
+
+       err = h.jobs.SetConfig(j, config)
+       if err != nil {
+               h.fail(j, fmt.Sprintf("Internal Weles error while setting config : %s", err.Error()))
+               return
+       }
+
+       h.configSaved(j)
+       h.sendIfReady(j)
+}
diff --git a/controller/downloaderimpl_test.go b/controller/downloaderimpl_test.go
new file mode 100644 (file)
index 0000000..47e3af8
--- /dev/null
@@ -0,0 +1,547 @@
+/*
+ *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package controller
+
+import (
+       "errors"
+       "fmt"
+       "sync"
+
+       . "git.tizen.org/tools/weles"
+       cmock "git.tizen.org/tools/weles/controller/mock"
+       . "git.tizen.org/tools/weles/controller/notifier"
+       mock "git.tizen.org/tools/weles/mock"
+       gomock "github.com/golang/mock/gomock"
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+var _ = Describe("DownloaderImpl", func() {
+       var r <-chan Notification
+       var jc *cmock.MockJobsController
+       var am *mock.MockArtifactManager
+       var h Downloader
+       var ctrl *gomock.Controller
+       j := JobID(0xCAFE)
+       paths := []string{}
+       for i := 0; i < 9; i++ {
+               paths = append(paths, fmt.Sprintf("path_%d", i))
+       }
+       config := Config{Action: Action{
+               Deploy: Deploy{Images: []ImageDefinition{
+                       ImageDefinition{Uri: "image_0", Md5Uri: "md5_0"},
+                       ImageDefinition{Uri: "image_1"},
+                       ImageDefinition{Md5Uri: "md5_2"},
+               }},
+               Test: Test{TestCases: []TestCase{
+                       TestCase{TestActions: []TestAction{
+                               Push{Uri: "uri_0", Alias: "alias_0"},
+                               Push{Uri: "uri_1", Alias: "alias_1"},
+                               Pull{Alias: "alias_2"},
+                       }},
+                       TestCase{TestActions: []TestAction{
+                               Push{Uri: "uri_3", Alias: "alias_3"},
+                       }},
+                       TestCase{TestActions: []TestAction{
+                               Pull{Alias: "alias_4"},
+                       }},
+               }},
+       }}
+       updatedConfig := Config{Action: Action{
+               Deploy: Deploy{Images: []ImageDefinition{
+                       ImageDefinition{Uri: "image_0", Md5Uri: "md5_0", Path: paths[0], MD5Path: paths[1]},
+                       ImageDefinition{Uri: "image_1", Path: paths[2]},
+                       ImageDefinition{Md5Uri: "md5_2", MD5Path: paths[3]},
+               }},
+               Test: Test{TestCases: []TestCase{
+                       TestCase{TestActions: []TestAction{
+                               Push{Uri: "uri_0", Alias: "alias_0", Path: paths[4]},
+                               Push{Uri: "uri_1", Alias: "alias_1", Path: paths[5]},
+                               Pull{Alias: "alias_2", Path: paths[7]},
+                       }},
+                       TestCase{TestActions: []TestAction{
+                               Push{Uri: "uri_3", Alias: "alias_3", Path: paths[6]},
+                       }},
+                       TestCase{TestActions: []TestAction{
+                               Pull{Alias: "alias_4", Path: paths[8]},
+                       }},
+               }},
+       }}
+       err := errors.New("test error")
+
+       BeforeEach(func() {
+               ctrl = gomock.NewController(GinkgoT())
+
+               jc = cmock.NewMockJobsController(ctrl)
+               am = mock.NewMockArtifactManager(ctrl)
+
+               h = NewDownloader(jc, am)
+               r = h.Listen()
+       })
+       AfterEach(func() {
+               ctrl.Finish()
+       })
+       Describe("NewDownloader", func() {
+               It("should create a new object", func() {
+                       Expect(h).NotTo(BeNil())
+                       Expect(h.(*DownloaderImpl).jobs).To(Equal(jc))
+                       Expect(h.(*DownloaderImpl).artifacts).To(Equal(am))
+                       Expect(h.(*DownloaderImpl).collector).NotTo(BeNil())
+                       Expect(h.(*DownloaderImpl).path2Job).NotTo(BeNil())
+                       Expect(h.(*DownloaderImpl).info).NotTo(BeNil())
+                       Expect(h.(*DownloaderImpl).mutex).NotTo(BeNil())
+               })
+       })
+       Describe("Download", func() {
+               sendChange := func(from, to int, status ArtifactStatus) {
+                       for i := from; i < to; i++ {
+                               h.(*DownloaderImpl).collector <- ArtifactStatusChange{Path: ArtifactPath(paths[i]), NewStatus: status}
+                       }
+               }
+               eventuallyNoti := func(offset int, ok bool, msg string) {
+                       notification := Notification{}
+                       expectedNotification := Notification{
+                               JobID: j,
+                               OK:    ok,
+                               Msg:   msg,
+                       }
+                       EventuallyWithOffset(offset, r).Should(Receive(&notification))
+                       ExpectWithOffset(offset, notification).To(Equal(expectedNotification))
+               }
+               eventuallyPathEmpty := func(offset int) {
+                       EventuallyWithOffset(offset, func() int {
+                               h.(*DownloaderImpl).mutex.Lock()
+                               defer h.(*DownloaderImpl).mutex.Unlock()
+                               return len(h.(*DownloaderImpl).path2Job)
+                       }).Should(BeZero())
+               }
+               eventuallyInfoEmpty := func(offset int) {
+                       EventuallyWithOffset(offset, func() int {
+                               h.(*DownloaderImpl).mutex.Lock()
+                               defer h.(*DownloaderImpl).mutex.Unlock()
+                               return len(h.(*DownloaderImpl).info)
+                       }).Should(BeZero())
+               }
+               eventuallyEmpty := func(offset int) {
+                       eventuallyPathEmpty(offset + 1)
+                       eventuallyInfoEmpty(offset + 1)
+               }
+               expectInfo := func(offset int, config bool, paths int) {
+                       h.(*DownloaderImpl).mutex.Lock()
+                       defer h.(*DownloaderImpl).mutex.Unlock()
+                       ExpectWithOffset(offset, len(h.(*DownloaderImpl).info)).To(Equal(1))
+                       v, ok := h.(*DownloaderImpl).info[j]
+                       ExpectWithOffset(offset, ok).To(BeTrue())
+                       ExpectWithOffset(offset, v.configSaved).To(Equal(config))
+                       ExpectWithOffset(offset, v.failed).To(Equal(0))
+                       ExpectWithOffset(offset, v.paths).To(Equal(paths))
+                       ExpectWithOffset(offset, v.ready).To(Equal(0))
+               }
+               expectPath := func(offset, from, to int) {
+                       h.(*DownloaderImpl).mutex.Lock()
+                       defer h.(*DownloaderImpl).mutex.Unlock()
+                       ExpectWithOffset(offset, len(h.(*DownloaderImpl).path2Job)).To(Equal(to - from))
+                       for i := from; i < to; i++ {
+                               v, ok := h.(*DownloaderImpl).path2Job[paths[i]]
+                               ExpectWithOffset(offset, ok).To(BeTrue(), "i = %d", i)
+                               ExpectWithOffset(offset, v).To(Equal(j), "i = %d", i)
+                       }
+               }
+               expectFail := func(offset int, pathsNo int, msg string) {
+                       eventuallyNoti(offset+1, false, msg)
+                       expectPath(offset+1, 0, pathsNo)
+                       eventuallyInfoEmpty(offset + 1)
+                       sendChange(0, pathsNo, AM_READY)
+                       eventuallyPathEmpty(offset + 1)
+               }
+               defaultSetStatusAndInfo := func() {
+                       gomock.InOrder(
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "4 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "5 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "6 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "7 / 7 artifacts ready"),
+                       )
+               }
+               defaultGetConfig := func() {
+                       jc.EXPECT().GetConfig(j).Return(config, nil)
+               }
+               defaultPush := func() {
+                       gomock.InOrder(
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[2]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_2", URI: "md5_2"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[3]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_0", URI: "uri_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[4]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_1", URI: "uri_1"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[5]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_3", URI: "uri_3"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[6]), nil),
+                       )
+               }
+               defaultCreate := func() {
+                       gomock.InOrder(
+                               am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_2"}).Return(ArtifactPath(paths[7]), nil),
+                               am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_4"}).Return(ArtifactPath(paths[8]), nil),
+                       )
+               }
+               defaultSetConfig := func() {
+                       jc.EXPECT().SetConfig(j, updatedConfig)
+               }
+               It("should delegate downloading of all artifacts successfully", func() {
+                       defaultSetStatusAndInfo()
+                       defaultGetConfig()
+                       defaultPush()
+                       defaultCreate()
+                       defaultSetConfig()
+
+                       h.Download(j)
+
+                       expectPath(1, 0, 7)
+                       expectInfo(1, true, 7)
+
+                       sendChange(0, 7, AM_READY)
+
+                       eventuallyNoti(1, true, "")
+                       eventuallyEmpty(1)
+               })
+               It("should fail if cannot set config", func() {
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+                       defaultGetConfig()
+                       defaultPush()
+                       defaultCreate()
+                       jc.EXPECT().SetConfig(j, updatedConfig).Return(err)
+
+                       h.Download(j)
+
+                       expectFail(1, 7, "Internal Weles error while setting config : test error")
+               })
+               It("should fail if pull fails", func() {
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+                       defaultGetConfig()
+                       gomock.InOrder(
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[2]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_2", URI: "md5_2"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[3]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_0", URI: "uri_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[4]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_1", URI: "uri_1"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[5]), nil),
+                       )
+                       am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_2"}).Return(ArtifactPath(""), err)
+
+                       h.Download(j)
+
+                       expectFail(1, 6, "Internal Weles error while creating a new path in ArtifactManager : test error")
+               })
+               It("should fail if push for TESTFILE fails", func() {
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+                       jc.EXPECT().GetConfig(j).Return(config, nil)
+
+                       gomock.InOrder(
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[2]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_2", URI: "md5_2"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[3]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_0", URI: "uri_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(""), err),
+                       )
+
+                       h.Download(j)
+
+                       expectFail(1, 4, "Internal Weles error while registering URI:<uri_0> in ArtifactManager : test error")
+               })
+               It("should fail if push for MD5 fails", func() {
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+                       jc.EXPECT().GetConfig(j).Return(config, nil)
+
+                       gomock.InOrder(
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(""), err),
+                       )
+
+                       h.Download(j)
+
+                       expectFail(1, 1, "Internal Weles error while registering URI:<md5_0> in ArtifactManager : test error")
+               })
+               It("should fail if push for image fails", func() {
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+                       jc.EXPECT().GetConfig(j).Return(config, nil)
+
+                       gomock.InOrder(
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(""), err),
+                       )
+
+                       h.Download(j)
+
+                       expectFail(1, 2, "Internal Weles error while registering URI:<image_1> in ArtifactManager : test error")
+               })
+               It("should fail if getting config fails", func() {
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+                       jc.EXPECT().GetConfig(j).Return(Config{}, err)
+
+                       h.Download(j)
+
+                       expectFail(1, 0, "Internal Weles error while getting Job config : test error")
+               })
+               It("should fail if setting status fails", func() {
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "").Return(err)
+
+                       h.Download(j)
+
+                       expectFail(1, 0, "Internal Weles error while changing Job status : test error")
+               })
+               It("should succeed when there is nothing to download", func() {
+                       emptyConfig := Config{Action: Action{
+                               Deploy: Deploy{Images: []ImageDefinition{}},
+                               Test: Test{TestCases: []TestCase{
+                                       TestCase{TestActions: []TestAction{
+                                               Pull{Alias: "alias_2"},
+                                       }},
+                                       TestCase{TestActions: []TestAction{
+                                               Pull{Alias: "alias_4"},
+                                       }},
+                               }},
+                       }}
+                       emptyUpdatedConfig := Config{Action: Action{
+                               Deploy: Deploy{Images: []ImageDefinition{}},
+                               Test: Test{TestCases: []TestCase{
+                                       TestCase{TestActions: []TestAction{
+                                               Pull{Alias: "alias_2", Path: paths[7]},
+                                       }},
+                                       TestCase{TestActions: []TestAction{
+                                               Pull{Alias: "alias_4", Path: paths[8]},
+                                       }},
+                               }},
+                       }}
+
+                       jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+                       jc.EXPECT().GetConfig(j).Return(emptyConfig, nil)
+
+                       gomock.InOrder(
+                               am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_2"}).Return(ArtifactPath(paths[7]), nil),
+                               am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_4"}).Return(ArtifactPath(paths[8]), nil),
+                       )
+                       jc.EXPECT().SetConfig(j, emptyUpdatedConfig)
+
+                       h.Download(j)
+
+                       eventuallyEmpty(1)
+                       eventuallyNoti(1, true, "")
+               })
+               It("should handle downloading failure", func() {
+                       gomock.InOrder(
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "Failed to download artifact"),
+                       )
+                       defaultGetConfig()
+                       defaultPush()
+                       defaultCreate()
+                       defaultSetConfig()
+
+                       h.Download(j)
+
+                       expectPath(1, 0, 7)
+                       expectInfo(1, true, 7)
+
+                       sendChange(0, 3, AM_READY)
+                       sendChange(3, 4, AM_FAILED)
+
+                       eventuallyNoti(1, false, "Failed to download all artifacts for the Job")
+                       expectPath(1, 4, 7)
+                       eventuallyInfoEmpty(1)
+
+                       sendChange(4, 7, AM_DOWNLOADING)
+                       eventuallyPathEmpty(1)
+               })
+               It("should block reply until configuration is saved and all artifacts are downloaded", func() {
+                       defaultSetStatusAndInfo()
+                       defaultGetConfig()
+                       defaultPush()
+                       defaultCreate()
+
+                       holdDownload := sync.WaitGroup{}
+                       holdDownload.Add(1)
+                       setConfigReached := sync.WaitGroup{}
+                       setConfigReached.Add(1)
+
+                       jc.EXPECT().SetConfig(j, updatedConfig).Do(func(JobID, Config) {
+                               setConfigReached.Done()
+                               holdDownload.Wait()
+                       })
+
+                       go h.Download(j)
+
+                       setConfigReached.Wait()
+
+                       expectPath(1, 0, 7)
+                       expectInfo(1, false, 7)
+
+                       sendChange(0, 7, AM_READY)
+                       holdDownload.Done()
+
+                       eventuallyNoti(1, true, "")
+                       eventuallyEmpty(1)
+               })
+               It("should handle failure in updating info", func() {
+                       gomock.InOrder(
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "4 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "5 / 7 artifacts ready").Return(err),
+                       )
+                       defaultGetConfig()
+                       defaultPush()
+                       defaultCreate()
+                       defaultSetConfig()
+
+                       h.Download(j)
+
+                       expectPath(1, 0, 7)
+                       expectInfo(1, true, 7)
+
+                       sendChange(0, 7, AM_READY)
+
+                       eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+                       eventuallyEmpty(1)
+               })
+               It("should leave no data left if failure response is sent while still processing config", func() {
+                       gomock.InOrder(
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "4 / 7 artifacts ready"),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "5 / 7 artifacts ready").Return(err),
+                       )
+                       defaultGetConfig()
+                       defaultPush()
+                       defaultCreate()
+
+                       holdDownload := sync.WaitGroup{}
+                       holdDownload.Add(1)
+                       setConfigReached := sync.WaitGroup{}
+                       setConfigReached.Add(1)
+
+                       jc.EXPECT().SetConfig(j, updatedConfig).Do(func(JobID, Config) {
+                               setConfigReached.Done()
+                               holdDownload.Wait()
+                       })
+
+                       go h.Download(j)
+                       setConfigReached.Wait()
+
+                       expectPath(1, 0, 7)
+                       expectInfo(1, false, 7)
+
+                       sendChange(0, 7, AM_READY)
+
+                       eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+
+                       holdDownload.Done()
+
+                       eventuallyEmpty(1)
+               })
+               It("should leave no data left if failure response is sent while pushing", func() {
+                       gomock.InOrder(
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 1 artifacts ready").Return(err),
+                       )
+                       defaultGetConfig()
+                       holdDownload := sync.WaitGroup{}
+                       holdDownload.Add(1)
+                       pushReached := sync.WaitGroup{}
+                       pushReached.Add(1)
+                       gomock.InOrder(
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+                               am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+                                       h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil).Do(func(ArtifactDescription, chan ArtifactStatusChange) {
+                                       pushReached.Done()
+                                       holdDownload.Wait()
+                               }),
+                       )
+
+                       go h.Download(j)
+                       pushReached.Wait()
+
+                       expectPath(1, 0, 1)
+                       expectInfo(1, false, 1)
+
+                       sendChange(0, 1, AM_READY)
+
+                       eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+
+                       holdDownload.Done()
+                       sendChange(1, 2, AM_READY)
+
+                       eventuallyEmpty(1)
+               })
+               It("should ignore changes to non-terminal states", func() {
+                       defaultSetStatusAndInfo()
+                       defaultGetConfig()
+                       defaultPush()
+                       defaultCreate()
+                       defaultSetConfig()
+
+                       h.Download(j)
+
+                       expectPath(1, 0, 7)
+                       expectInfo(1, true, 7)
+
+                       sendChange(0, 7, AM_DOWNLOADING)
+                       sendChange(0, 7, AM_PENDING)
+
+                       expectPath(1, 0, 7)
+                       expectInfo(1, true, 7)
+
+                       sendChange(0, 7, AM_READY)
+
+                       eventuallyNoti(1, true, "")
+                       eventuallyEmpty(1)
+               })
+       })
+})