Add event request handler
authorjw_wonny.cha <jw_wonny.cha@samsung.com>
Mon, 18 Mar 2019 11:12:12 +0000 (20:12 +0900)
committerjw_wonny.cha <jw_wonny.cha@samsung.com>
Mon, 18 Mar 2019 11:12:12 +0000 (20:12 +0900)
src/restapi/v1/restapi.go
src/servicemgr/message_queue.go [new file with mode: 0644]
src/servicemgr/service_agent.go

index d4b09c4..411cf1b 100644 (file)
@@ -95,9 +95,7 @@ func APIV1ServicemgrServicesServiceIDPost(w http.ResponseWriter, r *http.Request
        serviceID := vars["serviceid"]
        id, err := strconv.ParseUint(serviceID, 10, 64)
 
-       exist, _ := servicemgr.ServiceNameMap.Get(id)
-
-       if exist == nil || err != nil {
+       if len(serviceID) == 0 || err != nil {
                smbytes, _ := json.Marshal(servicemgr.ServiceCreationResponse{"NOTFOUND"})
                writeJSONResponse(w, smbytes, http.StatusBadRequest)
        } else {
@@ -112,7 +110,21 @@ func APIV1ServicemgrServicesServiceIDPost(w http.ResponseWriter, r *http.Request
 
 // APIV1ServicemgrEventServiceIDPost function
 func APIV1ServicemgrEventServiceIDPost(w http.ResponseWriter, r *http.Request) {
+       log.Printf("[%s] APIV1ServicemgrEventServiceIDPost", logPrefix)
+
+       vars := mux.Vars(r)
+       serviceID := vars["serviceid"]
+       id, err := strconv.ParseUint(serviceID, 10, 64)
+
+       if len(serviceID) == 0 || err != nil {
+               w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+               w.WriteHeader(http.StatusBadRequest)
+       } else {
+               servicemgr.DataPathHandler(w, r, id)
 
+               w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+               w.WriteHeader(http.StatusOK)
+       }
 }
 
 // APIV1ServicemgrServicesServiceIDDelete function
diff --git a/src/servicemgr/message_queue.go b/src/servicemgr/message_queue.go
new file mode 100644 (file)
index 0000000..d1d2baf
--- /dev/null
@@ -0,0 +1,71 @@
+package servicemgr
+
+import (
+       "encoding/json"
+       "fmt"
+       "log"
+
+       z4 "github.com/alecthomas/gozmq"
+)
+
+// Push is zmq push function
+func Push(serviceID uint64, msgChan chan MsgFormat, ctlChan chan bool) error {
+       ctx, err := z4.NewContext()
+       if err != nil {
+               log.Println("Failed to create new context")
+       }
+       defer ctx.Close()
+
+       pushSock, err := ctx.NewSocket(z4.PUSH)
+       if err != nil {
+               log.Println(serviceID, ": Failed to create new PUSH socket")
+               return err
+       }
+       defer pushSock.Close()
+
+       for {
+               msg := <-msgChan
+
+               log.Println(string(msg.Body.(string)))
+               msgBytes, err := json.Marshal(msg)
+               if err != nil {
+                       return err
+               }
+
+               // Send msg is non-blocking until 1000
+               // Guarantee message, message order
+               err = pushSock.Send(msgBytes, 0)
+
+               select {
+               case end := <-ctlChan:
+                       if end == true {
+                               return nil
+                       }
+               default:
+                       continue
+               }
+       }
+}
+
+// Pull is zmq pull function
+func Pull(serviceID uint64) error {
+       ctx, err := z4.NewContext()
+       if err != nil {
+               log.Println("Failed to create new context")
+       }
+       defer ctx.Close()
+
+       pullSock, err := ctx.NewSocket(z4.PULL)
+       if err != nil {
+               log.Println(serviceID, ": Failed to create new PUSH socket")
+               return err
+       }
+       defer pullSock.Close()
+
+       pullSock.Bind(fmt.Sprintf("ipc://%d", serviceID))
+
+       for {
+               msg, _ := pullSock.Recv(0)
+               fmt.Println("Recv:", string(msg))
+       }
+}
index b032d1d..91e9ded 100644 (file)
@@ -8,11 +8,6 @@ import (
        "time"
 )
 
-// ServiceAgent is interface for basic-service
-type ServiceAgent interface {
-       execute() error
-}
-
 func makeTime() string {
        t := time.Now()
        return t.Format(time.RFC3339)
@@ -27,10 +22,13 @@ func Create(serviceParam ServiceParam) []byte {
 
        for index := 0; index < instanceCount; index++ {
                var cmd *exec.Cmd
+
                log.Println("servicName: ", serviceParam.ServiceName)
-               id := CreateServiceMap(cmd, serviceParam.ServiceName, serviceParam.AppName)
+               id, msgChan, ctlChan := CreateServiceMap(cmd, serviceParam.ServiceName, serviceParam.AppName)
                log.Println("id : ", id)
 
+               go Push(id, msgChan, ctlChan)
+
                serviceInfo := ServiceExecutionItem{id, makeTime()}
                ret.ServiceList[index] = serviceInfo
        }
@@ -41,7 +39,9 @@ func Create(serviceParam ServiceParam) []byte {
 
 // Run is for executing service
 func Run(distService DistService, serviceID uint64) {
-       var service ServiceAgent
+       if checkServiceName(serviceID) == false {
+               return
+       }
 
        var args []string
        for _, userParam := range distService.UserParam.Args {
@@ -52,10 +52,8 @@ func Run(distService DistService, serviceID uint64) {
        args = append(args, strconv.Itoa(distService.SystemParam.Port))
        args = append(args, "--remote")
 
-       name, _ := ServiceNameMap.Get(serviceID)
-       serviceName, _ := name.(string)
-
-       service = PlayService{serviceID, serviceName, args}
+       serviceName, _ := GetServiceName(serviceID)
 
+       service := Service{serviceID, serviceName, args}
        service.execute()
 }