8 z4 "github.com/alecthomas/gozmq"
11 // Push is zmq push function
12 func Push(serviceID uint64, msgChan chan MsgFormat, ctlChan chan bool) error {
13 ctx, err := z4.NewContext()
15 log.Println(err.Error())
20 pushSock, err := ctx.NewSocket(z4.PUSH)
22 log.Println(err.Error())
25 defer pushSock.Close()
27 pushSock.Connect(fmt.Sprintf("ipc://%d", serviceID))
31 case msg := <-msgChan:
32 log.Println(string(msg.Body.(string)))
33 msgBytes, err := json.Marshal(msg)
35 log.Println(err.Error())
39 // Send msg is non-blocking until 1000
40 // Guarantee message, message order
41 err = pushSock.Send(msgBytes, 0)
42 case end := <-ctlChan:
50 // Pull is zmq pull function
51 func Pull(serviceID uint64, outputMsg chan MsgFormat, ctlChan chan bool) error {
52 ctx, err := z4.NewContext()
54 log.Println(err.Error())
59 pullSock, err := ctx.NewSocket(z4.PULL)
61 log.Println(err.Error())
64 defer pullSock.Close()
66 pullSock.Bind(fmt.Sprintf("ipc://%d", serviceID))
69 msg, err := pullSock.Recv(0)
71 log.Println(err.Error())
75 fmt.Println("Recv:", string(msg))
77 parsedMsg := MsgFormat{}
78 json.Unmarshal(msg, &parsedMsg)
80 outputMsg <- parsedMsg
83 case end := <-ctlChan: