11 z4 "github.com/alecthomas/gozmq"
14 func TestPushMsg(t *testing.T) {
15 msgChan := make(chan servicemgr.MsgFormat, servicemgr.IPCMsgChan)
16 ctlChan := make(chan bool, 1)
18 go servicemgr.Push(uint64(1), msgChan, ctlChan)
19 ctx, err := z4.NewContext()
21 log.Println("Failed to create new context")
25 pullSock, err := ctx.NewSocket(z4.PULL)
29 defer pullSock.Close()
31 pullSock.Bind(fmt.Sprintf("ipc://%d", uint64(1)))
33 for index := 0; index < 20; index++ {
34 msgChan <- servicemgr.MsgFormat{Header: servicemgr.MsgHeader{Type: "Data"}, Body: strconv.Itoa(index)}
36 msg, _ := pullSock.Recv(0)
38 parsedMsg := servicemgr.MsgFormat{}
39 json.Unmarshal(msg, &parsedMsg)
41 AssertEqualStr(t, parsedMsg.Body.(string), strconv.Itoa(index))
47 func TestPullMsg(t *testing.T) {
48 msgChan := make(chan servicemgr.MsgFormat, servicemgr.IPCMsgChan)
49 ctlPushChan := make(chan bool, 1)
51 outputChan := make(chan servicemgr.MsgFormat, servicemgr.IPCMsgChan)
52 ctlPullChan := make(chan bool, 1)
54 go servicemgr.Pull(uint64(1), outputChan, ctlPullChan)
55 go servicemgr.Push(uint64(1), msgChan, ctlPushChan)
57 for index := 0; index < 20; index++ {
58 msgChan <- servicemgr.MsgFormat{Header: servicemgr.MsgHeader{Type: "Data"}, Body: strconv.Itoa(index)}
60 case msg := <-outputChan:
61 AssertEqualStr(t, msg.Body.(string), strconv.Itoa(index))