From b31518fbe5b3c1cadc7cdb74578801d2d4115e31 Mon Sep 17 00:00:00 2001 From: Lukasz Wojciechowski Date: Mon, 20 Nov 2017 06:04:31 +0100 Subject: [PATCH] Add Downloader implementation with tests DownloaderImpl is responsible for parsing Job's config and searching for information about files (artifacts) related to the test. It delegates to ArtifactsManager downloading images and testfiles required to run Job (Push). It creates also paths for output artifacts (Pull). It updates Job's config with proper paths for these artifacts. It does also monitor process of downloading files in ArtifactManager and finally it notifies Controller when all files are downloaded or in case of failure. Change-Id: I578108dd1a924aaf36435e4dbc5d8dcce50214ee Signed-off-by: Lukasz Wojciechowski --- controller/downloaderimpl.go | 338 +++++++++++++++++++++++++ controller/downloaderimpl_test.go | 518 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 856 insertions(+) create mode 100644 controller/downloaderimpl.go create mode 100644 controller/downloaderimpl_test.go diff --git a/controller/downloaderimpl.go b/controller/downloaderimpl.go new file mode 100644 index 0000000..6f3a14f --- /dev/null +++ b/controller/downloaderimpl.go @@ -0,0 +1,338 @@ +/* + * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// File controller/downloaderimpl.go contains Downloader implementation. + +package controller + +import ( + "fmt" + "sync" + + "git.tizen.org/tools/weles" + "git.tizen.org/tools/weles/controller/notifier" +) + +const ( + formatJobStatus = "Internal Weles error while changing Job status : %s" + formatJobConfig = "Internal Weles error while getting Job config : %s" + formatURI = "Internal Weles error while registering URI:<%s> in ArtifactManager : %s" + formatPath = "Internal Weles error while creating a new path in ArtifactManager : %s" + formatConfig = "Internal Weles error while setting config : %s" + formatDownload = "Failed to download some artifacts for the Job" + formatReady = "%d / %d artifacts ready" +) + +// jobArtifactsInfo contains information about progress of downloading +// artifacts required by a single Job. +type jobArtifactsInfo struct { + paths int + ready int + failed int + configSaved bool +} + +// DownloaderImpl implements delegating downloading of artifacts required +// by Jobs to ArtifactsManager, monitors progress and notifies +// Controller, when all files are ready. +type DownloaderImpl struct { + // Notifier provides channel for communication with Controller. + notifier.Notifier + // jobs references module implementing Jobs management. + jobs JobsController + // artifacts references Weles module implementing ArtifactManager for + // managing ArtifactsDB. + artifacts weles.ArtifactManager + // collector gathers artifact status changes from ArtifactManager + collector chan weles.ArtifactStatusChange + + // path2Job identifies Job related to the artifact path. + path2Job map[string]weles.JobID + // info contains information about progress of downloading artifacts + // for Jobs. + info map[weles.JobID]*jobArtifactsInfo + //mutex protects access to path2Job and info maps. + mutex *sync.Mutex +} + +// NewDownloader creates a new DownloaderImpl structure setting up references +// to used Weles modules. +func NewDownloader(j JobsController, a weles.ArtifactManager) Downloader { + ret := &DownloaderImpl{ + Notifier: notifier.NewNotifier(), + jobs: j, + artifacts: a, + collector: make(chan weles.ArtifactStatusChange), + path2Job: make(map[string]weles.JobID), + info: make(map[weles.JobID]*jobArtifactsInfo), + mutex: new(sync.Mutex), + } + go ret.loop() + return ret +} + +// pathStatusChange reacts on notification from ArtifactManager and updates +// path and job structures. +func (h *DownloaderImpl) pathStatusChange(path string, status weles.ArtifactStatus) (changed bool, j weles.JobID, info string) { + h.mutex.Lock() + defer h.mutex.Unlock() + j, ok := h.path2Job[path] + if !ok { + return + } + i, ok := h.info[j] + if !ok { + delete(h.path2Job, path) + return + } + switch status { + case weles.AM_READY: + i.ready++ + info = fmt.Sprintf(formatReady, i.ready, i.paths) + case weles.AM_FAILED: + i.failed++ + info = "Failed to download artifact" + default: + return + } + delete(h.path2Job, path) + changed = true + return +} + +// removePath removes mapping from the path to related Job. +func (h *DownloaderImpl) removePath(path string) { + h.mutex.Lock() + defer h.mutex.Unlock() + delete(h.path2Job, path) +} + +// loop handles all notifications from ArtifactManager, updates +// jobArtifactsInfo and send notification to Controller, when the answer +// is ready. +// +// It is run in a separate goroutine - one for all jobs. +func (h *DownloaderImpl) loop() { + for { + change, open := <-h.collector + if !open { + return + } + update, j, info := h.pathStatusChange(string(change.Path), change.NewStatus) + if !update { + continue + } + + err := h.jobs.SetStatusAndInfo(j, weles.JOB_DOWNLOADING, info) + if err != nil { + h.removePath(string(change.Path)) + h.fail(j, fmt.Sprintf(formatJobStatus, err.Error())) + } + h.sendIfReady(j) + } +} + +// fail responses failure to Controller. +func (h *DownloaderImpl) fail(j weles.JobID, msg string) { + if h.removeJobInfo(j) == nil { + h.SendFail(j, msg) + } +} + +// succeed responses success to Controller. +func (h *DownloaderImpl) succeed(j weles.JobID) { + if h.removeJobInfo(j) == nil { + h.SendOK(j) + } +} + +// initializeJobInfo creates a jobArtifactInfo structure. +func (h *DownloaderImpl) initializeJobInfo(j weles.JobID) { + h.mutex.Lock() + defer h.mutex.Unlock() + _, ok := h.info[j] + if !ok { + h.info[j] = new(jobArtifactsInfo) + } +} + +// removeJobInfo removes a jobArtifactInfo structure. +func (h *DownloaderImpl) removeJobInfo(j weles.JobID) error { + h.mutex.Lock() + defer h.mutex.Unlock() + + _, ok := h.info[j] + if !ok { + return weles.ErrJobNotFound + } + delete(h.info, j) + return nil +} + +// push delegates downloading single uri to ArtifactDB. +func (h *DownloaderImpl) push(j weles.JobID, t weles.ArtifactType, alias string, uri string) (string, error) { + p, err := h.artifacts.PushArtifact(weles.ArtifactDescription{ + JobID: j, + Type: t, + Alias: weles.ArtifactAlias(alias), + URI: weles.ArtifactURI(uri), + }, h.collector) + if err != nil { + return "", err + } + + h.mutex.Lock() + defer h.mutex.Unlock() + + i, ok := h.info[j] + if !ok { + return "", weles.ErrJobNotFound + } + i.paths++ + h.path2Job[string(p)] = j + + return string(p), nil +} + +// pullCreate creates a new path for pull artifact. +func (h *DownloaderImpl) pullCreate(j weles.JobID, alias string) (string, error) { + p, err := h.artifacts.CreateArtifact(weles.ArtifactDescription{ + JobID: j, + Type: weles.AM_TESTFILE, + Alias: weles.ArtifactAlias(alias), + }) + return string(p), err +} + +// configSaved updates info structure. +func (h *DownloaderImpl) configSaved(j weles.JobID) { + h.mutex.Lock() + defer h.mutex.Unlock() + + i, ok := h.info[j] + if !ok { + return + } + + i.configSaved = true +} + +// verify if an answer to the Controller should be send. +func (h *DownloaderImpl) verify(j weles.JobID) (success, send bool) { + h.mutex.Lock() + defer h.mutex.Unlock() + + i, ok := h.info[j] + if !ok { // Job is not monitored (maybe it has been responded already). + return false, false + } + if i.failed > 0 { // Some artifacts fail to be downloaded. + return false, true + } + if !i.configSaved { // Config is not yet fully analyzed and saved. + return false, false + } + if i.ready == i.paths { // All artifacts are ready. + return true, true + } + return false, false +} + +// sendIfReady sends an answer to the Controller if it is ready. +func (h *DownloaderImpl) sendIfReady(j weles.JobID) { + success, send := h.verify(j) + + if !send { + return + } + + if success { + h.succeed(j) + } else { + h.fail(j, formatDownload) + } +} + +// DispatchDownloads parses Job's config and delegates to ArtifactManager downloading +// of all images and files to be pushed during Job execution. It also creates +// ArtifactDB paths for files that will be pulled from Dryad. +func (h *DownloaderImpl) DispatchDownloads(j weles.JobID) { + h.initializeJobInfo(j) + + err := h.jobs.SetStatusAndInfo(j, weles.JOB_DOWNLOADING, "") + if err != nil { + h.fail(j, fmt.Sprintf(formatJobStatus, err.Error())) + return + } + + config, err := h.jobs.GetConfig(j) + if err != nil { + h.fail(j, fmt.Sprintf(formatJobConfig, err.Error())) + return + } + + for i, image := range config.Action.Deploy.Images { + if image.URI != "" { + path, err := h.push(j, weles.AM_IMAGEFILE, fmt.Sprintf("Image_%d", i), image.URI) + if err != nil { + h.fail(j, fmt.Sprintf(formatURI, image.URI, err.Error())) + return + } + config.Action.Deploy.Images[i].Path = path + } + if image.ChecksumURI != "" { + path, err := h.push(j, weles.AM_IMAGEFILE, fmt.Sprintf("ImageMD5_%d", i), image.ChecksumURI) + if err != nil { + h.fail(j, fmt.Sprintf(formatURI, image.ChecksumURI, err.Error())) + return + } + config.Action.Deploy.Images[i].ChecksumPath = path + } + } + for i, tc := range config.Action.Test.TestCases { + for k, ta := range tc.TestActions { + switch ta.(type) { + case weles.Push: + action := ta.(weles.Push) + path, err := h.push(j, weles.AM_TESTFILE, action.Alias, action.URI) + if err != nil { + h.fail(j, fmt.Sprintf(formatURI, action.URI, err.Error())) + return + } + action.Path = path + config.Action.Test.TestCases[i].TestActions[k] = action + case weles.Pull: + action := ta.(weles.Pull) + path, err := h.pullCreate(j, action.Alias) + if err != nil { + h.fail(j, fmt.Sprintf(formatPath, err.Error())) + return + } + action.Path = path + config.Action.Test.TestCases[i].TestActions[k] = action + } + } + } + + err = h.jobs.SetConfig(j, config) + if err != nil { + h.fail(j, fmt.Sprintf(formatConfig, err.Error())) + return + } + + h.configSaved(j) + h.sendIfReady(j) +} diff --git a/controller/downloaderimpl_test.go b/controller/downloaderimpl_test.go new file mode 100644 index 0000000..28e9098 --- /dev/null +++ b/controller/downloaderimpl_test.go @@ -0,0 +1,518 @@ +/* + * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package controller + +import ( + "errors" + "fmt" + "sync" + + "git.tizen.org/tools/weles" + cmock "git.tizen.org/tools/weles/controller/mock" + "git.tizen.org/tools/weles/controller/notifier" + mock "git.tizen.org/tools/weles/mock" + gomock "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("DownloaderImpl", func() { + var r <-chan notifier.Notification + var jc *cmock.MockJobsController + var am *mock.MockArtifactManager + var h *DownloaderImpl + var ctrl *gomock.Controller + j := weles.JobID(0xCAFE) + paths := []string{} + for i := 0; i < 9; i++ { + paths = append(paths, fmt.Sprintf("path_%d", i)) + } + infos := []string{""} + for i := 1; i <= 7; i++ { + infos = append(infos, fmt.Sprintf("%d / 7 artifacts ready", i)) + } + config := weles.Config{Action: weles.Action{ + Deploy: weles.Deploy{Images: []weles.ImageDefinition{ + weles.ImageDefinition{URI: "image_0", ChecksumURI: "md5_0"}, + weles.ImageDefinition{URI: "image_1"}, + weles.ImageDefinition{ChecksumURI: "md5_2"}, + }}, + Test: weles.Test{TestCases: []weles.TestCase{ + weles.TestCase{TestActions: []weles.TestAction{ + weles.Push{URI: "uri_0", Alias: "alias_0"}, + weles.Push{URI: "uri_1", Alias: "alias_1"}, + weles.Pull{Alias: "alias_2"}, + }}, + weles.TestCase{TestActions: []weles.TestAction{ + weles.Push{URI: "uri_3", Alias: "alias_3"}, + }}, + weles.TestCase{TestActions: []weles.TestAction{ + weles.Pull{Alias: "alias_4"}, + }}, + }}, + }} + updatedConfig := weles.Config{Action: weles.Action{ + Deploy: weles.Deploy{Images: []weles.ImageDefinition{ + weles.ImageDefinition{URI: "image_0", ChecksumURI: "md5_0", Path: paths[0], ChecksumPath: paths[1]}, + weles.ImageDefinition{URI: "image_1", Path: paths[2]}, + weles.ImageDefinition{ChecksumURI: "md5_2", ChecksumPath: paths[3]}, + }}, + Test: weles.Test{TestCases: []weles.TestCase{ + weles.TestCase{TestActions: []weles.TestAction{ + weles.Push{URI: "uri_0", Alias: "alias_0", Path: paths[4]}, + weles.Push{URI: "uri_1", Alias: "alias_1", Path: paths[5]}, + weles.Pull{Alias: "alias_2", Path: paths[7]}, + }}, + weles.TestCase{TestActions: []weles.TestAction{ + weles.Push{URI: "uri_3", Alias: "alias_3", Path: paths[6]}, + }}, + weles.TestCase{TestActions: []weles.TestAction{ + weles.Pull{Alias: "alias_4", Path: paths[8]}, + }}, + }}, + }} + err := errors.New("test error") + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + + jc = cmock.NewMockJobsController(ctrl) + am = mock.NewMockArtifactManager(ctrl) + + h = NewDownloader(jc, am).(*DownloaderImpl) + r = h.Listen() + }) + AfterEach(func() { + ctrl.Finish() + }) + Describe("NewDownloader", func() { + It("should create a new object", func() { + Expect(h).NotTo(BeNil()) + Expect(h.jobs).To(Equal(jc)) + Expect(h.artifacts).To(Equal(am)) + Expect(h.collector).NotTo(BeNil()) + Expect(h.path2Job).NotTo(BeNil()) + Expect(h.info).NotTo(BeNil()) + Expect(h.mutex).NotTo(BeNil()) + }) + }) + Describe("Loop", func() { + It("should stop loop function after closing collector channel", func() { + close(h.collector) + }) + }) + Describe("DispatchDownloads", func() { + sendChange := func(from, to int, status weles.ArtifactStatus) { + for i := from; i < to; i++ { + h.collector <- weles.ArtifactStatusChange{Path: weles.ArtifactPath(paths[i]), NewStatus: status} + } + } + eventuallyNoti := func(offset int, ok bool, msg string) { + expectedNotification := notifier.Notification{ + JobID: j, + OK: ok, + Msg: msg, + } + EventuallyWithOffset(offset, r).Should(Receive(Equal(expectedNotification))) + } + eventuallyPathEmpty := func(offset int) { + EventuallyWithOffset(offset, func() int { + h.mutex.Lock() + defer h.mutex.Unlock() + return len(h.path2Job) + }).Should(BeZero()) + } + eventuallyInfoEmpty := func(offset int) { + EventuallyWithOffset(offset, func() int { + h.mutex.Lock() + defer h.mutex.Unlock() + return len(h.info) + }).Should(BeZero()) + } + eventuallyEmpty := func(offset int) { + eventuallyPathEmpty(offset + 1) + eventuallyInfoEmpty(offset + 1) + } + expectInfo := func(offset int, config bool, paths int) { + h.mutex.Lock() + defer h.mutex.Unlock() + ExpectWithOffset(offset, len(h.info)).To(Equal(1)) + v, ok := h.info[j] + ExpectWithOffset(offset, ok).To(BeTrue()) + ExpectWithOffset(offset, v.configSaved).To(Equal(config)) + ExpectWithOffset(offset, v.failed).To(Equal(0)) + ExpectWithOffset(offset, v.paths).To(Equal(paths)) + ExpectWithOffset(offset, v.ready).To(Equal(0)) + } + expectPath := func(offset, from, to int) { + h.mutex.Lock() + defer h.mutex.Unlock() + ExpectWithOffset(offset, len(h.path2Job)).To(Equal(to - from)) + for i := from; i < to; i++ { + v, ok := h.path2Job[paths[i]] + ExpectWithOffset(offset, ok).To(BeTrue(), "i = %d", i) + ExpectWithOffset(offset, v).To(Equal(j), "i = %d", i) + } + } + expectFail := func(offset int, pathsNo int, msg string) { + eventuallyNoti(offset+1, false, msg) + expectPath(offset+1, 0, pathsNo) + eventuallyInfoEmpty(offset + 1) + sendChange(0, pathsNo, weles.AM_READY) + eventuallyPathEmpty(offset + 1) + } + defaultSetStatusAndInfo := func(successfulEntries int, fail bool) *gomock.Call { + var i int + var prev, call *gomock.Call + + for i = 0; i < successfulEntries; i++ { + call = jc.EXPECT().SetStatusAndInfo(j, weles.JOB_DOWNLOADING, infos[i]) + if prev != nil { + call.After(prev) + } + prev = call + } + if fail { + call = jc.EXPECT().SetStatusAndInfo(j, weles.JOB_DOWNLOADING, infos[i]).Return(err) + if prev != nil { + call.After(prev) + } + } + return call + } + defaultGetConfig := func() { + jc.EXPECT().GetConfig(j).Return(config, nil) + } + defaultPush := func(successfulEntries int, fail bool) *gomock.Call { + types := []weles.ArtifactType{weles.AM_IMAGEFILE, weles.AM_IMAGEFILE, weles.AM_IMAGEFILE, weles.AM_IMAGEFILE, weles.AM_TESTFILE, weles.AM_TESTFILE, weles.AM_TESTFILE} + aliases := []weles.ArtifactAlias{"Image_0", "ImageMD5_0", "Image_1", "ImageMD5_2", "alias_0", "alias_1", "alias_3"} + uris := []weles.ArtifactURI{"image_0", "md5_0", "image_1", "md5_2", "uri_0", "uri_1", "uri_3"} + var i int + var prev, call *gomock.Call + + for i = 0; i < successfulEntries; i++ { + call = am.EXPECT().PushArtifact(weles.ArtifactDescription{JobID: j, Type: types[i], Alias: aliases[i], URI: uris[i]}, + h.collector).Return(weles.ArtifactPath(paths[i]), nil) + if prev != nil { + call.After(prev) + } + prev = call + } + if fail { + call = am.EXPECT().PushArtifact(weles.ArtifactDescription{JobID: j, Type: types[i], Alias: aliases[i], URI: uris[i]}, + h.collector).Return(weles.ArtifactPath(""), err) + if prev != nil { + call.After(prev) + } + } + return call + } + defaultCreate := func(successfulEntries int, fail bool) *gomock.Call { + types := []weles.ArtifactType{weles.AM_TESTFILE, weles.AM_TESTFILE} + aliases := []weles.ArtifactAlias{"alias_2", "alias_4"} + returnPaths := []weles.ArtifactPath{weles.ArtifactPath(paths[7]), weles.ArtifactPath(paths[8])} + var i int + var prev, call *gomock.Call + + for i = 0; i < successfulEntries; i++ { + call = am.EXPECT().CreateArtifact(weles.ArtifactDescription{JobID: j, Type: types[i], Alias: aliases[i]}). + Return(returnPaths[i], nil) + if prev != nil { + call.After(prev) + } + prev = call + } + if fail { + call = am.EXPECT().CreateArtifact(weles.ArtifactDescription{JobID: j, Type: types[i], Alias: aliases[i]}). + Return(weles.ArtifactPath(""), err) + if prev != nil { + call.After(prev) + } + } + return call + } + defaultSetConfig := func() { + jc.EXPECT().SetConfig(j, updatedConfig) + } + It("should delegate downloading of all artifacts successfully", func() { + defaultSetStatusAndInfo(8, false) + defaultGetConfig() + defaultPush(7, false) + defaultCreate(2, false) + defaultSetConfig() + + h.DispatchDownloads(j) + + expectPath(1, 0, 7) + expectInfo(1, true, 7) + + sendChange(0, 7, weles.AM_READY) + + eventuallyNoti(1, true, "") + eventuallyEmpty(1) + }) + It("should fail if cannot set config", func() { + defaultSetStatusAndInfo(1, false) + defaultGetConfig() + defaultPush(7, false) + defaultCreate(2, false) + jc.EXPECT().SetConfig(j, updatedConfig).Return(err) + + h.DispatchDownloads(j) + + expectFail(1, 7, "Internal Weles error while setting config : test error") + }) + It("should fail if pull fails", func() { + defaultSetStatusAndInfo(1, false) + defaultGetConfig() + defaultPush(6, false) + defaultCreate(0, true) + + h.DispatchDownloads(j) + + expectFail(1, 6, "Internal Weles error while creating a new path in ArtifactManager : test error") + }) + It("should fail if push for TESTFILE fails", func() { + defaultSetStatusAndInfo(1, false) + jc.EXPECT().GetConfig(j).Return(config, nil) + defaultPush(4, true) + + h.DispatchDownloads(j) + + expectFail(1, 4, "Internal Weles error while registering URI: in ArtifactManager : test error") + }) + It("should fail if push for MD5 fails", func() { + defaultSetStatusAndInfo(1, false) + jc.EXPECT().GetConfig(j).Return(config, nil) + defaultPush(1, true) + + h.DispatchDownloads(j) + + expectFail(1, 1, "Internal Weles error while registering URI: in ArtifactManager : test error") + }) + It("should fail if push for image fails", func() { + defaultSetStatusAndInfo(1, false) + jc.EXPECT().GetConfig(j).Return(config, nil) + defaultPush(2, true) + + h.DispatchDownloads(j) + + expectFail(1, 2, "Internal Weles error while registering URI: in ArtifactManager : test error") + }) + It("should fail if getting config fails", func() { + defaultSetStatusAndInfo(1, false) + jc.EXPECT().GetConfig(j).Return(weles.Config{}, err) + + h.DispatchDownloads(j) + + expectFail(1, 0, "Internal Weles error while getting Job config : test error") + }) + It("should fail if setting status fails", func() { + defaultSetStatusAndInfo(0, true) + + h.DispatchDownloads(j) + + expectFail(1, 0, "Internal Weles error while changing Job status : test error") + }) + It("should succeed when there is nothing to download", func() { + emptyConfig := weles.Config{Action: weles.Action{ + Deploy: weles.Deploy{Images: []weles.ImageDefinition{}}, + Test: weles.Test{TestCases: []weles.TestCase{ + weles.TestCase{TestActions: []weles.TestAction{ + weles.Pull{Alias: "alias_2"}, + }}, + weles.TestCase{TestActions: []weles.TestAction{ + weles.Pull{Alias: "alias_4"}, + }}, + }}, + }} + emptyUpdatedConfig := weles.Config{Action: weles.Action{ + Deploy: weles.Deploy{Images: []weles.ImageDefinition{}}, + Test: weles.Test{TestCases: []weles.TestCase{ + weles.TestCase{TestActions: []weles.TestAction{ + weles.Pull{Alias: "alias_2", Path: paths[7]}, + }}, + weles.TestCase{TestActions: []weles.TestAction{ + weles.Pull{Alias: "alias_4", Path: paths[8]}, + }}, + }}, + }} + + defaultSetStatusAndInfo(1, false) + jc.EXPECT().GetConfig(j).Return(emptyConfig, nil) + + defaultCreate(2, false) + jc.EXPECT().SetConfig(j, emptyUpdatedConfig) + + h.DispatchDownloads(j) + + eventuallyEmpty(1) + eventuallyNoti(1, true, "") + }) + It("should handle downloading failure", func() { + c := defaultSetStatusAndInfo(4, false) + jc.EXPECT().SetStatusAndInfo(j, weles.JOB_DOWNLOADING, "Failed to download artifact").After(c) + defaultGetConfig() + defaultPush(7, false) + defaultCreate(2, false) + defaultSetConfig() + + h.DispatchDownloads(j) + + expectPath(1, 0, 7) + expectInfo(1, true, 7) + + sendChange(0, 3, weles.AM_READY) + sendChange(3, 4, weles.AM_FAILED) + + eventuallyNoti(1, false, formatDownload) + expectPath(1, 4, 7) + eventuallyInfoEmpty(1) + + sendChange(4, 7, weles.AM_DOWNLOADING) + eventuallyPathEmpty(1) + }) + It("should block reply until configuration is saved and all artifacts are downloaded", func() { + defaultSetStatusAndInfo(8, false) + defaultGetConfig() + defaultPush(7, false) + defaultCreate(2, false) + + holdDownload := sync.WaitGroup{} + holdDownload.Add(1) + setConfigReached := sync.WaitGroup{} + setConfigReached.Add(1) + + jc.EXPECT().SetConfig(j, updatedConfig).Do(func(weles.JobID, weles.Config) { + setConfigReached.Done() + holdDownload.Wait() + }) + + go h.DispatchDownloads(j) + + setConfigReached.Wait() + + expectPath(1, 0, 7) + expectInfo(1, false, 7) + + sendChange(0, 7, weles.AM_READY) + holdDownload.Done() + + eventuallyNoti(1, true, "") + eventuallyEmpty(1) + }) + It("should handle failure in updating info", func() { + defaultSetStatusAndInfo(5, true) + defaultGetConfig() + defaultPush(7, false) + defaultCreate(2, false) + defaultSetConfig() + + h.DispatchDownloads(j) + + expectPath(1, 0, 7) + expectInfo(1, true, 7) + + sendChange(0, 7, weles.AM_READY) + + eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error") + eventuallyEmpty(1) + }) + It("should leave no data left if failure response is sent while still processing config", func() { + defaultSetStatusAndInfo(5, true) + defaultGetConfig() + defaultPush(7, false) + defaultCreate(2, false) + + holdDownload := sync.WaitGroup{} + holdDownload.Add(1) + setConfigReached := sync.WaitGroup{} + setConfigReached.Add(1) + + jc.EXPECT().SetConfig(j, updatedConfig).Do(func(weles.JobID, weles.Config) { + setConfigReached.Done() + holdDownload.Wait() + }) + + go h.DispatchDownloads(j) + setConfigReached.Wait() + + expectPath(1, 0, 7) + expectInfo(1, false, 7) + + sendChange(0, 7, weles.AM_READY) + + eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error") + + holdDownload.Done() + + eventuallyEmpty(1) + }) + It("should leave no data left if failure response is sent while pushing", func() { + c := defaultSetStatusAndInfo(1, false) + jc.EXPECT().SetStatusAndInfo(j, weles.JOB_DOWNLOADING, "1 / 1 artifacts ready").Return(err).After(c) + defaultGetConfig() + holdDownload := sync.WaitGroup{} + holdDownload.Add(1) + pushReached := sync.WaitGroup{} + pushReached.Add(1) + + defaultPush(2, false).Do(func(weles.ArtifactDescription, chan weles.ArtifactStatusChange) { + pushReached.Done() + holdDownload.Wait() + }) + + go h.DispatchDownloads(j) + pushReached.Wait() + + expectPath(1, 0, 1) + expectInfo(1, false, 1) + + sendChange(0, 1, weles.AM_READY) + + eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error") + + holdDownload.Done() + sendChange(1, 2, weles.AM_READY) + + eventuallyEmpty(1) + }) + It("should ignore changes to non-terminal states", func() { + defaultSetStatusAndInfo(8, false) + defaultGetConfig() + defaultPush(7, false) + defaultCreate(2, false) + defaultSetConfig() + + h.DispatchDownloads(j) + + expectPath(1, 0, 7) + expectInfo(1, true, 7) + + sendChange(0, 7, weles.AM_DOWNLOADING) + sendChange(0, 7, weles.AM_PENDING) + + expectPath(1, 0, 7) + expectInfo(1, true, 7) + + sendChange(0, 7, weles.AM_READY) + + eventuallyNoti(1, true, "") + eventuallyEmpty(1) + }) + }) +}) -- 2.7.4