71a9306f5eaa45b90a7fbf648b46af2216cf5269
[platform/core/system/edge-orchestration.git] / src / servicemgr / message_queue.go
1 package servicemgr
2
3 import (
4         "encoding/json"
5         "fmt"
6         "log"
7
8         z4 "github.com/alecthomas/gozmq"
9 )
10
11 // Push is zmq push function
12 func Push(serviceID uint64, msgChan chan MsgFormat, ctlChan chan bool) error {
13         ctx, err := z4.NewContext()
14         if err != nil {
15                 log.Println(err.Error())
16                 return err
17         }
18         defer ctx.Close()
19
20         pushSock, err := ctx.NewSocket(z4.PUSH)
21         if err != nil {
22                 log.Println(err.Error())
23                 return err
24         }
25         defer pushSock.Close()
26
27         pushSock.Connect(fmt.Sprintf("ipc://%d", serviceID))
28
29         for {
30                 select {
31                 case msg := <-msgChan:
32                         log.Println(string(msg.Body.(string)))
33                         msgBytes, err := json.Marshal(msg)
34                         if err != nil {
35                                 log.Println(err.Error())
36                                 return err
37                         }
38
39                         // Send msg is non-blocking until 1000
40                         // Guarantee message, message order
41                         err = pushSock.Send(msgBytes, 0)
42                 case end := <-ctlChan:
43                         if end == true {
44                                 return nil
45                         }
46                 }
47         }
48 }
49
50 // Pull is zmq pull function
51 func Pull(serviceID uint64, outputMsg chan MsgFormat, ctlChan chan bool) error {
52         ctx, err := z4.NewContext()
53         if err != nil {
54                 log.Println(err.Error())
55                 return err
56         }
57         defer ctx.Close()
58
59         pullSock, err := ctx.NewSocket(z4.PULL)
60         if err != nil {
61                 log.Println(err.Error())
62                 return err
63         }
64         defer pullSock.Close()
65
66         pullSock.Bind(fmt.Sprintf("ipc://%d", serviceID))
67
68         for {
69                 msg, err := pullSock.Recv(0)
70                 if err != nil {
71                         log.Println(err.Error())
72                         continue
73                 }
74
75                 fmt.Println("Recv:", string(msg))
76
77                 parsedMsg := MsgFormat{}
78                 json.Unmarshal(msg, &parsedMsg)
79
80                 outputMsg <- parsedMsg
81
82                 select {
83                 case end := <-ctlChan:
84                         if end == true {
85                                 return nil
86                         }
87                 default:
88                         continue
89                 }
90         }
91 }