Increased pool test timeout (hate time based tests...)
This commit is contained in:
+12
-11
@@ -3,13 +3,14 @@ package pool
|
|||||||
type Executor func(workload interface{})
|
type Executor func(workload interface{})
|
||||||
|
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
name string
|
name string
|
||||||
item interface{}
|
item interface{}
|
||||||
workers []worker
|
workers []worker
|
||||||
exec Executor
|
exec Executor
|
||||||
|
workerChannel chan chan work
|
||||||
|
queue chan work // receives jobs to send to workers
|
||||||
|
end chan bool // when receives bool stops workers
|
||||||
//queue *dque.DQue
|
//queue *dque.DQue
|
||||||
queue chan work // receives jobs to send to workers
|
|
||||||
end chan bool // when receives bool stops workers
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Pool, error) {
|
func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Pool, error) {
|
||||||
@@ -26,12 +27,14 @@ func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Po
|
|||||||
// return nil, err
|
// return nil, err
|
||||||
//}
|
//}
|
||||||
//p.queue = q
|
//p.queue = q
|
||||||
|
|
||||||
|
p.workerChannel = make(chan chan work)
|
||||||
for i := 0; i < workerCount; i++ {
|
for i := 0; i < workerCount; i++ {
|
||||||
worker := worker{
|
worker := worker{
|
||||||
p: p,
|
p: p,
|
||||||
id: i,
|
id: i,
|
||||||
channel: make(chan work),
|
channel: make(chan work),
|
||||||
workerChannel: workerChannel,
|
workerChannel: p.workerChannel,
|
||||||
end: make(chan bool)}
|
end: make(chan bool)}
|
||||||
worker.Start()
|
worker.Start()
|
||||||
p.workers = append(p.workers, worker)
|
p.workers = append(p.workers, worker)
|
||||||
@@ -47,8 +50,8 @@ func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Po
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
case work := <-p.queue:
|
case work := <-p.queue:
|
||||||
worker := <-workerChannel // wait for available channel
|
worker := <-p.workerChannel // wait for available channel
|
||||||
worker <- work // dispatch work to worker
|
worker <- work // dispatch work to worker
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -64,8 +67,6 @@ func (p *Pool) Submit(workload interface{}) {
|
|||||||
// return reflect.New(t).Interface()
|
// return reflect.New(t).Interface()
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
var workerChannel = make(chan chan work)
|
|
||||||
|
|
||||||
type work struct {
|
type work struct {
|
||||||
workload interface{}
|
workload interface{}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ var _ = Describe("Pool", func() {
|
|||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
pool.Submit(&testItem{ID: i})
|
pool.Submit(&testItem{ID: i})
|
||||||
}
|
}
|
||||||
Eventually(processed.Len).Should(Equal(5))
|
Eventually(processed.Len, "10s").Should(Equal(5))
|
||||||
Expect(processed).To(ContainElements(0, 1, 2, 3, 4))
|
Expect(processed).To(ContainElements(0, 1, 2, 3, 4))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user