}
req.State = INPROGRESS
+ req.Job = &JobInfo{WorkerUUID: worker}
+
if reqs.iterating {
reqs.queue.releaseIterator()
reqs.iterating = false
}
reqs.queue.removeRequest(req)
- // TODO(lwojciechow) assign req.Job.
+ // TODO(mwereski) Get timeout period from default config / user capabilities.
+ timeoutPeriod := time.Hour
+
+ req.Job.Timeout = time.Now().Add(timeoutPeriod)
+ reqs.timeoutTimes.insert(requestTime{time: req.Job.Timeout, req: rid})
+
return nil
}
})
})
Describe("Run", func() {
+ testWorker := WorkerUUID("TestWorker")
+
It("should fail if reqID is unknown", func() {
R.mutex.Lock()
defer R.mutex.Unlock()
Expect(R.queue.length).To(Equal(uint(1)))
- err := R.Run(noreq, WorkerUUID("TestWorker"))
+ err := R.Run(noreq, testWorker)
Expect(err).To(Equal(NotFoundError("Request")))
Expect(R.queue.length).To(Equal(uint(1)))
})
defer R.TerminateIteration()
Expect(R.iterating).To(BeTrue())
Expect(R.queue.length).To(Equal(uint(1)))
- err := R.Run(noreq, WorkerUUID("TestWorker"))
+ err := R.Run(noreq, testWorker)
Expect(err).To(Equal(NotFoundError("Request")))
Expect(R.iterating).To(BeTrue())
Expect(R.queue.length).To(Equal(uint(1)))
R.InitIteration()
defer R.TerminateIteration()
Expect(R.queue.length).To(Equal(uint(1)))
- err := R.Run(req, WorkerUUID("TestWorker"))
+ err := R.Run(req, testWorker)
Expect(err).NotTo(HaveOccurred())
Expect(rinfo.State).To(Equal(INPROGRESS))
+ Expect(rinfo.Job.Timeout).To(BeTemporally(">", time.Now()))
Expect(R.queue.length).To(BeZero())
})
It("should start progress and break iterations when iterating", func() {
defer R.TerminateIteration()
Expect(R.queue.length).To(Equal(uint(1)))
Expect(R.iterating).To(BeTrue())
- err := R.Run(req, WorkerUUID("TestWorker"))
+ err := R.Run(req, testWorker)
Expect(err).NotTo(HaveOccurred())
Expect(rinfo.State).To(Equal(INPROGRESS))
+ Expect(rinfo.Job.Timeout).To(BeTemporally(">", time.Now()))
Expect(R.iterating).To(BeFalse())
Expect(R.queue.length).To(BeZero())
})
- // TODO use and verify Job when Run's implementation is complete.
})
})
})