--- /dev/null
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// File controller/downloaderimpl.go implements Downloader.
+
+package controller
+
+import (
+ "fmt"
+ "sync"
+
+ . "git.tizen.org/tools/weles"
+ "git.tizen.org/tools/weles/controller/notifier"
+)
+
+// jobArtifactsInfo contains information about progress of downloading
+// artifacts required by a single Job.
+type jobArtifactsInfo struct {
+ paths int
+ ready int
+ failed int
+ configSaved bool
+}
+
+// DownloaderImpl implements delegating downloading of artifacts required
+// to run Jobs to ArtifactsManager, monitors progress and notifies
+// Controller, when all files are ready.
+type DownloaderImpl struct {
+ // Notifier provides channel for communication with Controller.
+ notifier.Notifier
+ // jobs references module implementing Jobs management.
+ jobs JobsController
+ // artifacts references Weles module implementing ArtifactManager for
+ // managing ArtifactsDB.
+ artifacts ArtifactManager
+ // collector gathers artifact status changes from ArtifactManager
+ collector chan ArtifactStatusChange
+
+ // path2Job identifies Job related to the artifact path.
+ path2Job map[string]JobID
+ // info contains information about progress of downloading artifacts
+ // for Jobs.
+ info map[JobID]jobArtifactsInfo
+ //mutex protects access to path2Job and info maps.
+ mutex *sync.Mutex
+}
+
+// NewDownloader creates a new DownloaderImpl structure setting up references
+// to used Weles modules.
+func NewDownloader(j JobsController, a ArtifactManager) Downloader {
+ ret := &DownloaderImpl{
+ Notifier: notifier.NewNotifier(),
+ jobs: j,
+ artifacts: a,
+ collector: make(chan ArtifactStatusChange),
+ path2Job: make(map[string]JobID),
+ info: make(map[JobID]jobArtifactsInfo),
+ mutex: new(sync.Mutex),
+ }
+ go ret.loop()
+ return ret
+}
+
+// pathStatusChange reacts on notification from ArtifactManager and updates
+// path and job structures.
+func (h *DownloaderImpl) pathStatusChange(path string, status ArtifactStatus) (changed bool, j JobID, info string) {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ j, ok := h.path2Job[path]
+ if !ok {
+ return
+ }
+ i, ok := h.info[j]
+ if !ok {
+ delete(h.path2Job, path)
+ return
+ }
+ switch status {
+ case AM_READY:
+ i.ready++
+ info = fmt.Sprintf("%d / %d artifacts ready", i.ready, i.paths)
+ case AM_FAILED:
+ i.failed++
+ info = "Failed to download artifact"
+ default:
+ return
+ }
+ h.info[j] = i
+ delete(h.path2Job, path)
+ changed = true
+ return
+}
+
+// removePath removes mapping from the path to related Job.
+func (h *DownloaderImpl) removePath(path string) {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ delete(h.path2Job, path)
+}
+
+// loop handles all notifications from ArtficatManager, updates
+// jobArtficatsInfo and send notification to Controller, when the answer
+// is ready.
+//
+// It is run in a separate goroutine - one for all jobs.
+func (h *DownloaderImpl) loop() {
+ for {
+ change := <-h.collector
+ update, j, info := h.pathStatusChange(string(change.Path), change.NewStatus)
+ if !update {
+ continue
+ }
+
+ err := h.jobs.SetStatusAndInfo(j, JOB_DOWNLOADING, info)
+ if err != nil {
+ h.removePath(string(change.Path))
+ h.fail(j, fmt.Sprintf("Internal Weles error while changing Job status : %s", err.Error()))
+ }
+ h.sendIfReady(j)
+ }
+}
+
+// fail responses failure to Controller.
+func (h *DownloaderImpl) fail(j JobID, msg string) {
+ if h.removeJobInfo(j) == nil {
+ h.SendFail(j, msg)
+ }
+}
+
+// succeed responses success to Controller.
+func (h *DownloaderImpl) succeed(j JobID) {
+ if h.removeJobInfo(j) == nil {
+ h.SendOK(j)
+ }
+}
+
+// addJobInfo creates a jobArtifactInfo structure.
+func (h *DownloaderImpl) addJobInfo(j JobID) {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ i, ok := h.info[j]
+ if !ok {
+ i = jobArtifactsInfo{}
+ }
+ h.info[j] = i
+}
+
+// removeJobInfo removes a jobArtifactInfo structure.
+func (h *DownloaderImpl) removeJobInfo(j JobID) error {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+
+ _, ok := h.info[j]
+ if !ok {
+ return ErrJobNotFound
+ }
+ delete(h.info, j)
+ return nil
+}
+
+// push delegates downloading single uri to ArtifactDB.
+func (h *DownloaderImpl) push(j JobID, t ArtifactType, alias string, uri string) (string, error) {
+ p, err := h.artifacts.PushArtifact(ArtifactDescription{
+ JobID: j,
+ Type: t,
+ Alias: ArtifactAlias(alias),
+ URI: ArtifactURI(uri),
+ }, h.collector)
+ if err != nil {
+ return "", err
+ }
+
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+
+ i, ok := h.info[j]
+ if !ok {
+ return "", ErrJobNotFound
+ }
+ i.paths++
+ h.info[j] = i
+ h.path2Job[string(p)] = j
+
+ return string(p), nil
+}
+
+// pullCreate creates a new path for pull artifact.
+func (h *DownloaderImpl) pullCreate(j JobID, alias string) (string, error) {
+ p, err := h.artifacts.CreateArtifact(ArtifactDescription{
+ JobID: j,
+ Type: AM_TESTFILE,
+ Alias: ArtifactAlias(alias),
+ })
+ return string(p), err
+}
+
+// configSaved updates info structure.
+func (h *DownloaderImpl) configSaved(j JobID) {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+
+ i, ok := h.info[j]
+ if !ok {
+ return
+ }
+
+ i.configSaved = true
+ h.info[j] = i
+}
+
+// verify if an answer to the Controller should be send.
+func (h *DownloaderImpl) verify(j JobID) (ok, fail bool) {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+
+ i, ok := h.info[j]
+ if !ok { // Job is not monitored (maybe it has been responded already).
+ return false, false
+ }
+ if i.failed > 0 { // Some artifacts fail to be downloaded.
+ return false, true
+ }
+ if !i.configSaved { // Config is not yet fully analyzed and saved.
+ return false, false
+ }
+ if i.ready == i.paths { // All artifacts are ready.
+ return true, false
+ }
+ return false, false
+}
+
+// sendIfReady sends an answer to the Controller if it is ready.
+func (h *DownloaderImpl) sendIfReady(j JobID) {
+ ok, failed := h.verify(j)
+
+ if ok {
+ h.succeed(j)
+ return
+ }
+ if failed {
+ h.fail(j, "Failed to download all artifacts for the Job")
+ }
+}
+
+// Download parses Job's config and delegates to ArtifactManager downloading
+// of all images and files to bu pushed during Job execution. It also creates
+// ArtifactDB paths for files that will be pulled from Dryad.
+func (h *DownloaderImpl) Download(j JobID) {
+ h.addJobInfo(j)
+
+ err := h.jobs.SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ if err != nil {
+ h.fail(j, fmt.Sprintf("Internal Weles error while changing Job status : %s", err.Error()))
+ return
+ }
+
+ config, err := h.jobs.GetConfig(j)
+ if err != nil {
+ h.fail(j, fmt.Sprintf("Internal Weles error while getting Job config : %s", err.Error()))
+ return
+ }
+
+ for i, image := range config.Action.Deploy.Images {
+ if image.Uri != "" {
+ path, err := h.push(j, AM_IMAGEFILE, fmt.Sprintf("Image_%d", i), image.Uri)
+ if err != nil {
+ h.fail(j, fmt.Sprintf("Internal Weles error while registering URI:<%s> in ArtifactManager : %s", image.Uri, err.Error()))
+ return
+ }
+ config.Action.Deploy.Images[i].Path = path
+ }
+ if image.Md5Uri != "" {
+ path, err := h.push(j, AM_IMAGEFILE, fmt.Sprintf("ImageMD5_%d", i), image.Md5Uri)
+ if err != nil {
+ h.fail(j, fmt.Sprintf("Internal Weles error while registering URI:<%s> in ArtifactManager : %s", image.Md5Uri, err.Error()))
+ return
+ }
+ config.Action.Deploy.Images[i].MD5Path = path
+ }
+ }
+ for i, tc := range config.Action.Test.TestCases {
+ for k, ta := range tc.TestActions {
+ switch ta.(type) {
+ case Push:
+ action := ta.(Push)
+ path, err := h.push(j, AM_TESTFILE, action.Alias, action.Uri)
+ if err != nil {
+ h.fail(j, fmt.Sprintf("Internal Weles error while registering URI:<%s> in ArtifactManager : %s", action.Uri, err.Error()))
+ return
+ }
+ action.Path = path
+ config.Action.Test.TestCases[i].TestActions[k] = action
+ case Pull:
+ action := ta.(Pull)
+ path, err := h.pullCreate(j, action.Alias)
+ if err != nil {
+ h.fail(j, fmt.Sprintf("Internal Weles error while creating a new path in ArtifactManager : %s", err.Error()))
+ return
+ }
+ action.Path = path
+ config.Action.Test.TestCases[i].TestActions[k] = action
+ }
+ }
+ }
+
+ err = h.jobs.SetConfig(j, config)
+ if err != nil {
+ h.fail(j, fmt.Sprintf("Internal Weles error while setting config : %s", err.Error()))
+ return
+ }
+
+ h.configSaved(j)
+ h.sendIfReady(j)
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package controller
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ . "git.tizen.org/tools/weles"
+ cmock "git.tizen.org/tools/weles/controller/mock"
+ . "git.tizen.org/tools/weles/controller/notifier"
+ mock "git.tizen.org/tools/weles/mock"
+ gomock "github.com/golang/mock/gomock"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("DownloaderImpl", func() {
+ var r <-chan Notification
+ var jc *cmock.MockJobsController
+ var am *mock.MockArtifactManager
+ var h Downloader
+ var ctrl *gomock.Controller
+ j := JobID(0xCAFE)
+ paths := []string{}
+ for i := 0; i < 9; i++ {
+ paths = append(paths, fmt.Sprintf("path_%d", i))
+ }
+ config := Config{Action: Action{
+ Deploy: Deploy{Images: []ImageDefinition{
+ ImageDefinition{Uri: "image_0", Md5Uri: "md5_0"},
+ ImageDefinition{Uri: "image_1"},
+ ImageDefinition{Md5Uri: "md5_2"},
+ }},
+ Test: Test{TestCases: []TestCase{
+ TestCase{TestActions: []TestAction{
+ Push{Uri: "uri_0", Alias: "alias_0"},
+ Push{Uri: "uri_1", Alias: "alias_1"},
+ Pull{Alias: "alias_2"},
+ }},
+ TestCase{TestActions: []TestAction{
+ Push{Uri: "uri_3", Alias: "alias_3"},
+ }},
+ TestCase{TestActions: []TestAction{
+ Pull{Alias: "alias_4"},
+ }},
+ }},
+ }}
+ updatedConfig := Config{Action: Action{
+ Deploy: Deploy{Images: []ImageDefinition{
+ ImageDefinition{Uri: "image_0", Md5Uri: "md5_0", Path: paths[0], MD5Path: paths[1]},
+ ImageDefinition{Uri: "image_1", Path: paths[2]},
+ ImageDefinition{Md5Uri: "md5_2", MD5Path: paths[3]},
+ }},
+ Test: Test{TestCases: []TestCase{
+ TestCase{TestActions: []TestAction{
+ Push{Uri: "uri_0", Alias: "alias_0", Path: paths[4]},
+ Push{Uri: "uri_1", Alias: "alias_1", Path: paths[5]},
+ Pull{Alias: "alias_2", Path: paths[7]},
+ }},
+ TestCase{TestActions: []TestAction{
+ Push{Uri: "uri_3", Alias: "alias_3", Path: paths[6]},
+ }},
+ TestCase{TestActions: []TestAction{
+ Pull{Alias: "alias_4", Path: paths[8]},
+ }},
+ }},
+ }}
+ err := errors.New("test error")
+
+ BeforeEach(func() {
+ ctrl = gomock.NewController(GinkgoT())
+
+ jc = cmock.NewMockJobsController(ctrl)
+ am = mock.NewMockArtifactManager(ctrl)
+
+ h = NewDownloader(jc, am)
+ r = h.Listen()
+ })
+ AfterEach(func() {
+ ctrl.Finish()
+ })
+ Describe("NewDownloader", func() {
+ It("should create a new object", func() {
+ Expect(h).NotTo(BeNil())
+ Expect(h.(*DownloaderImpl).jobs).To(Equal(jc))
+ Expect(h.(*DownloaderImpl).artifacts).To(Equal(am))
+ Expect(h.(*DownloaderImpl).collector).NotTo(BeNil())
+ Expect(h.(*DownloaderImpl).path2Job).NotTo(BeNil())
+ Expect(h.(*DownloaderImpl).info).NotTo(BeNil())
+ Expect(h.(*DownloaderImpl).mutex).NotTo(BeNil())
+ })
+ })
+ Describe("Download", func() {
+ sendChange := func(from, to int, status ArtifactStatus) {
+ for i := from; i < to; i++ {
+ h.(*DownloaderImpl).collector <- ArtifactStatusChange{Path: ArtifactPath(paths[i]), NewStatus: status}
+ }
+ }
+ eventuallyNoti := func(offset int, ok bool, msg string) {
+ notification := Notification{}
+ expectedNotification := Notification{
+ JobID: j,
+ OK: ok,
+ Msg: msg,
+ }
+ EventuallyWithOffset(offset, r).Should(Receive(¬ification))
+ ExpectWithOffset(offset, notification).To(Equal(expectedNotification))
+ }
+ eventuallyPathEmpty := func(offset int) {
+ EventuallyWithOffset(offset, func() int {
+ h.(*DownloaderImpl).mutex.Lock()
+ defer h.(*DownloaderImpl).mutex.Unlock()
+ return len(h.(*DownloaderImpl).path2Job)
+ }).Should(BeZero())
+ }
+ eventuallyInfoEmpty := func(offset int) {
+ EventuallyWithOffset(offset, func() int {
+ h.(*DownloaderImpl).mutex.Lock()
+ defer h.(*DownloaderImpl).mutex.Unlock()
+ return len(h.(*DownloaderImpl).info)
+ }).Should(BeZero())
+ }
+ eventuallyEmpty := func(offset int) {
+ eventuallyPathEmpty(offset + 1)
+ eventuallyInfoEmpty(offset + 1)
+ }
+ expectInfo := func(offset int, config bool, paths int) {
+ h.(*DownloaderImpl).mutex.Lock()
+ defer h.(*DownloaderImpl).mutex.Unlock()
+ ExpectWithOffset(offset, len(h.(*DownloaderImpl).info)).To(Equal(1))
+ v, ok := h.(*DownloaderImpl).info[j]
+ ExpectWithOffset(offset, ok).To(BeTrue())
+ ExpectWithOffset(offset, v.configSaved).To(Equal(config))
+ ExpectWithOffset(offset, v.failed).To(Equal(0))
+ ExpectWithOffset(offset, v.paths).To(Equal(paths))
+ ExpectWithOffset(offset, v.ready).To(Equal(0))
+ }
+ expectPath := func(offset, from, to int) {
+ h.(*DownloaderImpl).mutex.Lock()
+ defer h.(*DownloaderImpl).mutex.Unlock()
+ ExpectWithOffset(offset, len(h.(*DownloaderImpl).path2Job)).To(Equal(to - from))
+ for i := from; i < to; i++ {
+ v, ok := h.(*DownloaderImpl).path2Job[paths[i]]
+ ExpectWithOffset(offset, ok).To(BeTrue(), "i = %d", i)
+ ExpectWithOffset(offset, v).To(Equal(j), "i = %d", i)
+ }
+ }
+ expectFail := func(offset int, pathsNo int, msg string) {
+ eventuallyNoti(offset+1, false, msg)
+ expectPath(offset+1, 0, pathsNo)
+ eventuallyInfoEmpty(offset + 1)
+ sendChange(0, pathsNo, AM_READY)
+ eventuallyPathEmpty(offset + 1)
+ }
+ defaultSetStatusAndInfo := func() {
+ gomock.InOrder(
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "4 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "5 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "6 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "7 / 7 artifacts ready"),
+ )
+ }
+ defaultGetConfig := func() {
+ jc.EXPECT().GetConfig(j).Return(config, nil)
+ }
+ defaultPush := func() {
+ gomock.InOrder(
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[2]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_2", URI: "md5_2"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[3]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_0", URI: "uri_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[4]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_1", URI: "uri_1"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[5]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_3", URI: "uri_3"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[6]), nil),
+ )
+ }
+ defaultCreate := func() {
+ gomock.InOrder(
+ am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_2"}).Return(ArtifactPath(paths[7]), nil),
+ am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_4"}).Return(ArtifactPath(paths[8]), nil),
+ )
+ }
+ defaultSetConfig := func() {
+ jc.EXPECT().SetConfig(j, updatedConfig)
+ }
+ It("should delegate downloading of all artifacts successfully", func() {
+ defaultSetStatusAndInfo()
+ defaultGetConfig()
+ defaultPush()
+ defaultCreate()
+ defaultSetConfig()
+
+ h.Download(j)
+
+ expectPath(1, 0, 7)
+ expectInfo(1, true, 7)
+
+ sendChange(0, 7, AM_READY)
+
+ eventuallyNoti(1, true, "")
+ eventuallyEmpty(1)
+ })
+ It("should fail if cannot set config", func() {
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ defaultGetConfig()
+ defaultPush()
+ defaultCreate()
+ jc.EXPECT().SetConfig(j, updatedConfig).Return(err)
+
+ h.Download(j)
+
+ expectFail(1, 7, "Internal Weles error while setting config : test error")
+ })
+ It("should fail if pull fails", func() {
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ defaultGetConfig()
+ gomock.InOrder(
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[2]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_2", URI: "md5_2"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[3]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_0", URI: "uri_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[4]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_1", URI: "uri_1"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[5]), nil),
+ )
+ am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_2"}).Return(ArtifactPath(""), err)
+
+ h.Download(j)
+
+ expectFail(1, 6, "Internal Weles error while creating a new path in ArtifactManager : test error")
+ })
+ It("should fail if push for TESTFILE fails", func() {
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ jc.EXPECT().GetConfig(j).Return(config, nil)
+
+ gomock.InOrder(
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[2]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_2", URI: "md5_2"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[3]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_0", URI: "uri_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(""), err),
+ )
+
+ h.Download(j)
+
+ expectFail(1, 4, "Internal Weles error while registering URI:<uri_0> in ArtifactManager : test error")
+ })
+ It("should fail if push for MD5 fails", func() {
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ jc.EXPECT().GetConfig(j).Return(config, nil)
+
+ gomock.InOrder(
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(""), err),
+ )
+
+ h.Download(j)
+
+ expectFail(1, 1, "Internal Weles error while registering URI:<md5_0> in ArtifactManager : test error")
+ })
+ It("should fail if push for image fails", func() {
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ jc.EXPECT().GetConfig(j).Return(config, nil)
+
+ gomock.InOrder(
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_1", URI: "image_1"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(""), err),
+ )
+
+ h.Download(j)
+
+ expectFail(1, 2, "Internal Weles error while registering URI:<image_1> in ArtifactManager : test error")
+ })
+ It("should fail if getting config fails", func() {
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ jc.EXPECT().GetConfig(j).Return(Config{}, err)
+
+ h.Download(j)
+
+ expectFail(1, 0, "Internal Weles error while getting Job config : test error")
+ })
+ It("should fail if setting status fails", func() {
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "").Return(err)
+
+ h.Download(j)
+
+ expectFail(1, 0, "Internal Weles error while changing Job status : test error")
+ })
+ It("should succeed when there is nothing to download", func() {
+ emptyConfig := Config{Action: Action{
+ Deploy: Deploy{Images: []ImageDefinition{}},
+ Test: Test{TestCases: []TestCase{
+ TestCase{TestActions: []TestAction{
+ Pull{Alias: "alias_2"},
+ }},
+ TestCase{TestActions: []TestAction{
+ Pull{Alias: "alias_4"},
+ }},
+ }},
+ }}
+ emptyUpdatedConfig := Config{Action: Action{
+ Deploy: Deploy{Images: []ImageDefinition{}},
+ Test: Test{TestCases: []TestCase{
+ TestCase{TestActions: []TestAction{
+ Pull{Alias: "alias_2", Path: paths[7]},
+ }},
+ TestCase{TestActions: []TestAction{
+ Pull{Alias: "alias_4", Path: paths[8]},
+ }},
+ }},
+ }}
+
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "")
+ jc.EXPECT().GetConfig(j).Return(emptyConfig, nil)
+
+ gomock.InOrder(
+ am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_2"}).Return(ArtifactPath(paths[7]), nil),
+ am.EXPECT().CreateArtifact(ArtifactDescription{JobID: j, Type: AM_TESTFILE, Alias: "alias_4"}).Return(ArtifactPath(paths[8]), nil),
+ )
+ jc.EXPECT().SetConfig(j, emptyUpdatedConfig)
+
+ h.Download(j)
+
+ eventuallyEmpty(1)
+ eventuallyNoti(1, true, "")
+ })
+ It("should handle downloading failure", func() {
+ gomock.InOrder(
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "Failed to download artifact"),
+ )
+ defaultGetConfig()
+ defaultPush()
+ defaultCreate()
+ defaultSetConfig()
+
+ h.Download(j)
+
+ expectPath(1, 0, 7)
+ expectInfo(1, true, 7)
+
+ sendChange(0, 3, AM_READY)
+ sendChange(3, 4, AM_FAILED)
+
+ eventuallyNoti(1, false, "Failed to download all artifacts for the Job")
+ expectPath(1, 4, 7)
+ eventuallyInfoEmpty(1)
+
+ sendChange(4, 7, AM_DOWNLOADING)
+ eventuallyPathEmpty(1)
+ })
+ It("should block reply until configuration is saved and all artifacts are downloaded", func() {
+ defaultSetStatusAndInfo()
+ defaultGetConfig()
+ defaultPush()
+ defaultCreate()
+
+ holdDownload := sync.WaitGroup{}
+ holdDownload.Add(1)
+ setConfigReached := sync.WaitGroup{}
+ setConfigReached.Add(1)
+
+ jc.EXPECT().SetConfig(j, updatedConfig).Do(func(JobID, Config) {
+ setConfigReached.Done()
+ holdDownload.Wait()
+ })
+
+ go h.Download(j)
+
+ setConfigReached.Wait()
+
+ expectPath(1, 0, 7)
+ expectInfo(1, false, 7)
+
+ sendChange(0, 7, AM_READY)
+ holdDownload.Done()
+
+ eventuallyNoti(1, true, "")
+ eventuallyEmpty(1)
+ })
+ It("should handle failure in updating info", func() {
+ gomock.InOrder(
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "4 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "5 / 7 artifacts ready").Return(err),
+ )
+ defaultGetConfig()
+ defaultPush()
+ defaultCreate()
+ defaultSetConfig()
+
+ h.Download(j)
+
+ expectPath(1, 0, 7)
+ expectInfo(1, true, 7)
+
+ sendChange(0, 7, AM_READY)
+
+ eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+ eventuallyEmpty(1)
+ })
+ It("should leave no data left if failure response is sent while still processing config", func() {
+ gomock.InOrder(
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "2 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "3 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "4 / 7 artifacts ready"),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "5 / 7 artifacts ready").Return(err),
+ )
+ defaultGetConfig()
+ defaultPush()
+ defaultCreate()
+
+ holdDownload := sync.WaitGroup{}
+ holdDownload.Add(1)
+ setConfigReached := sync.WaitGroup{}
+ setConfigReached.Add(1)
+
+ jc.EXPECT().SetConfig(j, updatedConfig).Do(func(JobID, Config) {
+ setConfigReached.Done()
+ holdDownload.Wait()
+ })
+
+ go h.Download(j)
+ setConfigReached.Wait()
+
+ expectPath(1, 0, 7)
+ expectInfo(1, false, 7)
+
+ sendChange(0, 7, AM_READY)
+
+ eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+
+ holdDownload.Done()
+
+ eventuallyEmpty(1)
+ })
+ It("should leave no data left if failure response is sent while pushing", func() {
+ gomock.InOrder(
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, ""),
+ jc.EXPECT().SetStatusAndInfo(j, JOB_DOWNLOADING, "1 / 1 artifacts ready").Return(err),
+ )
+ defaultGetConfig()
+ holdDownload := sync.WaitGroup{}
+ holdDownload.Add(1)
+ pushReached := sync.WaitGroup{}
+ pushReached.Add(1)
+ gomock.InOrder(
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "Image_0", URI: "image_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[0]), nil),
+ am.EXPECT().PushArtifact(ArtifactDescription{JobID: j, Type: AM_IMAGEFILE, Alias: "ImageMD5_0", URI: "md5_0"},
+ h.(*DownloaderImpl).collector).Return(ArtifactPath(paths[1]), nil).Do(func(ArtifactDescription, chan ArtifactStatusChange) {
+ pushReached.Done()
+ holdDownload.Wait()
+ }),
+ )
+
+ go h.Download(j)
+ pushReached.Wait()
+
+ expectPath(1, 0, 1)
+ expectInfo(1, false, 1)
+
+ sendChange(0, 1, AM_READY)
+
+ eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+
+ holdDownload.Done()
+ sendChange(1, 2, AM_READY)
+
+ eventuallyEmpty(1)
+ })
+ It("should ignore changes to non-terminal states", func() {
+ defaultSetStatusAndInfo()
+ defaultGetConfig()
+ defaultPush()
+ defaultCreate()
+ defaultSetConfig()
+
+ h.Download(j)
+
+ expectPath(1, 0, 7)
+ expectInfo(1, true, 7)
+
+ sendChange(0, 7, AM_DOWNLOADING)
+ sendChange(0, 7, AM_PENDING)
+
+ expectPath(1, 0, 7)
+ expectInfo(1, true, 7)
+
+ sendChange(0, 7, AM_READY)
+
+ eventuallyNoti(1, true, "")
+ eventuallyEmpty(1)
+ })
+ })
+})