[Speech API]: running mainloop in crosswalk extension
[profile/ivi/speech-recognition.git] / src / plugins / client-api / w3c-speech / crosswalk-extension / speech_instance.cc
1 // Copyright (c) 2014 Intel Corporation. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "speech_instance.h"
6
7 #include <gio/gio.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/types.h>
13 #include <sys/select.h>
14 #include <sys/socket.h>
15 #include <sys/un.h>
16 #include <unistd.h>
17
18 #include <string>
19
20 #include "speech_logs.h"
21
22 #define WINTHORP_SERVER_SOCKET "@winthorpe.w3c-speech"
23 #ifdef TIZEN
24 // static
25 void SpeechInstance::SetupMainloop(void* data) {
26   SpeechInstance* self = reinterpret_cast<SpeechInstance*>(data);
27
28   g_main_loop_run(self->main_loop_);
29 }
30 #endif  // TIZEN
31
32 SpeechInstance::SpeechInstance()
33 #ifdef TIZEN
34     : main_loop_(g_main_loop_new(0, FALSE))
35     , thread_(SpeechInstance::SetupMainloop, this)
36     , fd_(-1)
37 #else
38     : fd_(-1)
39 #endif  // TIZEN
40     , channel_(NULL)
41     , watcher_id_(0)
42     , pending_request_timer_(0)
43     , pending_reply_timer_(0) {
44 #ifdef TIZEN
45     thread_.detach();
46 #endif  // TIZEN
47   if ((fd_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
48     ERR("Failed to create socket: %s", strerror(errno));
49     fd_ = -1;
50     return;
51   }
52
53   struct sockaddr_un server;
54   memset(&server, 0, sizeof(server));
55   server.sun_family = AF_UNIX,
56   strncpy(server.sun_path, WINTHORP_SERVER_SOCKET, sizeof(server.sun_path) - 1);
57
58   int len = SUN_LEN(&server);
59   DBG("Socket path : %s", server.sun_path + 1);
60   server.sun_path[0] = 0;
61
62   if (connect(fd_, (struct sockaddr *)&server, len)) {
63     ERR("Failed to connect to server : %s", strerror(errno));
64     close(fd_);
65     fd_ = -1;
66     return;
67   }
68
69   channel_ = g_io_channel_unix_new(fd_);
70   GIOCondition flags = GIOCondition(G_IO_IN | G_IO_ERR | G_IO_HUP);
71   watcher_id_ = g_io_add_watch(channel_, flags, IOWatchCb, this);
72
73   DBG("Connected to server");
74 }
75
76 SpeechInstance::~SpeechInstance() {
77   if (watcher_id_)
78     g_source_remove(watcher_id_);
79   if (pending_reply_timer_)
80     g_source_remove(pending_reply_timer_);
81   if (pending_request_timer_)
82     g_source_remove(pending_request_timer_);
83   if (fd_)
84     close(fd_);
85   if (channel_)
86     g_io_channel_unref(channel_);
87
88 #ifdef TIZEN
89   g_main_loop_quit(main_loop_);
90   g_main_loop_unref(main_loop_);
91 #endif  // TIZEN
92 }
93
94 // static
95 gboolean SpeechInstance::IOWatchCb(GIOChannel *c,
96                                    GIOCondition cond,
97                                    gpointer userdata) {
98   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(userdata);
99
100   (void)c;
101
102   DBG("IO Event on socket : %d", cond);
103
104   switch (cond) {
105     case G_IO_HUP:
106     case G_IO_ERR:
107       // TODO(avalluri): raise error and close the connection
108       break;
109     case G_IO_IN: {
110       char *reply = NULL;
111       uint32_t size = 0;
112
113       if ((size = self->ReadReply(&reply))) {
114         self->PostMessage(reply);
115         free(reply);
116       }
117
118       break;
119     }
120     default:
121       break;
122   }
123
124   return TRUE;
125 }
126
127 bool SpeechInstance::SendRequest(const char *message) {
128   uint32_t size = ((uint32_t)strlen(message));
129   uint32_t size_be = htobe32(size);
130
131   if (fd_ == -1) {
132     ERR("Socket not connected!");
133     return false;
134   }
135
136   if (send(fd_, static_cast<void*>(&size_be), sizeof(size_be), 0) < 0) {
137     WARN("Failed to send message size: %s", strerror(errno));
138     return false;
139   }
140
141   void *buf = const_cast<void*>(static_cast<const void *>(message));
142   ssize_t len = 0;
143   while (size && (len = send(fd_, buf, size, 0)) < size) {
144     if (len < 0) {
145       WARN("Failed to send message to server: %s", strerror(errno));
146       return false;
147     }
148     size -= len;
149     buf = static_cast<char*>(buf) + len;
150   }
151
152   return true;
153 }
154
155 uint32_t SpeechInstance::ReadReply(char **reply) {
156   uint32_t size_be = 0;
157
158   if (recv(fd_, static_cast<void*>(&size_be),
159       sizeof(size_be), MSG_WAITALL) < 0) {
160     WARN("Failed read server reply: %s", strerror(errno));
161     return 0;
162   }
163
164   uint32_t size = be32toh(size_be);
165   DBG("Received message size : %u", size);
166   char *message = static_cast<char *>(malloc(size + 1));
167   memset(message, 0, size);
168
169   // FIXME: loop through till complete read
170   if (recv(fd_, message, size, MSG_WAITALL) < 0) {
171     WARN("Failed to read server message with size '%u': %s",
172         size, strerror(errno));
173     free(message);
174     return 0;
175   }
176   message[size] = '\0';
177
178   DBG("Recived message : %s", message);
179
180   if (reply) *reply = message;
181
182   return size;
183 }
184
185 // static
186 gboolean SpeechInstance::ProcessPendingRepliesCb(gpointer data) {
187   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
188
189   if (!self) {
190     WARN("asset(self)");
191     return FALSE;
192   }
193
194   std::string reply = self->pending_replies_.front();
195   self->PostMessage(reply.c_str());
196   self->pending_replies_.pop();
197
198   if (self->pending_replies_.empty()) {
199     self->pending_reply_timer_ = 0;
200     return FALSE;
201   }
202
203   return TRUE;
204 }
205
206 // static
207 gboolean SpeechInstance::ProcessPendingRequestsCb(gpointer data) {
208   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
209   if (!self) {
210     WARN("assert(self)");
211     return FALSE;
212   }
213
214   std::string &request = self->pending_requests_.front();
215   if (!self->SendRequest(request.data())) {
216     picojson::value in;
217     picojson::object out;
218
219     picojson::parse(in, request.data(), request.data() + request.size(), NULL);
220     out["reqno"] = in.get("reqno");
221     out["error"] = picojson::value("network");
222     out["message"] = picojson::value("failed to connect to server");
223
224     self->QueueReply(picojson::value(out).serialize());
225   }
226   self->pending_requests_.pop();
227
228   if (self->pending_requests_.empty()) {
229     self->pending_request_timer_ = 0;
230     return FALSE;
231   }
232
233   return TRUE;
234 }
235
236 void SpeechInstance::QueueReply(const std::string& reply) {
237   pending_replies_.push(reply);
238   if (!pending_reply_timer_) {
239     pending_reply_timer_ = g_idle_add(ProcessPendingRepliesCb,
240         static_cast<gpointer>(this));
241   }
242 }
243
244 void SpeechInstance::QueueRequest(const std::string& req) {
245   pending_requests_.push(req);
246   if (!pending_request_timer_) {
247     pending_request_timer_ = g_idle_add(ProcessPendingRequestsCb,
248         static_cast<gpointer>(this));
249   }
250 }
251
252 void SpeechInstance::HandleSyncMessage(const char* message) {
253   picojson::value v, out;
254   std::string err;
255
256   DBG("Message: %s", message);
257
258   if (!SendRequest(message)) {
259     picojson::object obj;
260
261     obj["error"] = picojson::value("network");
262     obj["message"] = picojson::value("server connection failure");
263     out = picojson::value(obj);
264   } else {
265     picojson::parse(v, message, message + strlen(message), &err);
266     if (!err.empty())
267       return;
268
269     const std::string& req_id = v.get("reqno").to_str();
270
271     do {
272       char *reply = NULL;
273       uint32_t size;
274
275       if ((size = ReadReply(&reply)) != 0) {
276         picojson::parse(out, reply, reply + size, &err);
277         free(reply);
278         if (!err.empty()) {
279           WARN("Failed to read server reply: %s", strerror(errno));
280           // TODO(avalluri): fill error details in out
281           break;
282         } else if (out.get("reqno").to_str() == req_id) {
283           break;
284         } else {
285           QueueReply(out.serialize());
286         }
287       }
288     } while (0);
289   }
290
291   SendSyncReply(out.serialize().c_str());
292 }
293
294 void SpeechInstance::HandleMessage(const char* message) {
295   if (!message)
296     return;
297
298   QueueRequest(message);
299 }