+++ /dev/null
-package servicemgr
-
-import (
- "encoding/json"
- "fmt"
- "log"
-
- z4 "github.com/alecthomas/gozmq"
-)
-
-// PushIpcZmq is zmq push function
-func PushIpcZmq(serviceID uint64, msgChan chan MsgFormat, ctlChan chan bool) error {
- ctx, err := z4.NewContext()
- if err != nil {
- log.Println(err.Error())
- return err
- }
- defer ctx.Close()
-
- pushSock, err := ctx.NewSocket(z4.PUSH)
- if err != nil {
- log.Println(err.Error())
- return err
- }
- defer pushSock.Close()
-
- pushSock.Connect(fmt.Sprintf("ipc://%d", serviceID))
-
- for {
- select {
- case msg := <-msgChan:
- log.Println(string(msg.Body.(string)))
- msgBytes, err := json.Marshal(msg)
- if err != nil {
- log.Println(err.Error())
- return err
- }
-
- // Send msg is non-blocking until 1000
- // Guarantee message, message order
- err = pushSock.Send(msgBytes, 0)
- case end := <-ctlChan:
- if end == true {
- return nil
- }
- }
- }
-}
-
-// PullIpcZmq is zmq pull function
-func PullIpcZmq(serviceID uint64, outputMsg chan MsgFormat, ctlChan chan bool) error {
- ctx, err := z4.NewContext()
- if err != nil {
- log.Println(err.Error())
- return err
- }
- defer ctx.Close()
-
- pullSock, err := ctx.NewSocket(z4.PULL)
- if err != nil {
- log.Println(err.Error())
- return err
- }
- defer pullSock.Close()
-
- pullSock.Bind(fmt.Sprintf("ipc://%d", serviceID))
-
- for {
- msg, err := pullSock.Recv(0)
- if err != nil {
- log.Println(err.Error())
- continue
- }
-
- fmt.Println("Recv:", string(msg))
-
- parsedMsg := MsgFormat{}
- json.Unmarshal(msg, &parsedMsg)
-
- outputMsg <- parsedMsg
-
- select {
- case end := <-ctlChan:
- if end == true {
- return nil
- }
- default:
- continue
- }
- }
-}