[W3C-Speech]: Fix issues found in extension code
[profile/ivi/speech-recognition.git] / src / plugins / client-api / w3c-speech / crosswalk-extension / speech_instance.cc
1 // Copyright (c) 2014 Intel Corporation. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "speech_instance.h"
6
7 #include <gio/gio.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/types.h>
13 #include <sys/select.h>
14 #include <sys/socket.h>
15 #include <sys/un.h>
16 #include <unistd.h>
17
18 #include <string>
19
20 #include "speech_logs.h"
21
22 #define WINTHORP_SERVER_SOCKET "@winthorpe.w3c-speech"
23 #ifdef TIZEN
24 // static
25 void SpeechInstance::SetupMainloop(void* data) {
26   SpeechInstance* self = reinterpret_cast<SpeechInstance*>(data);
27
28   DBG("Running Mainloop...");
29
30   g_main_loop_run(self->main_loop_);
31 }
32 #endif  // TIZEN
33
34 SpeechInstance::SpeechInstance()
35 #ifdef TIZEN
36     : main_loop_(g_main_loop_new(0, FALSE))
37     , thread_(SpeechInstance::SetupMainloop, this)
38     , fd_(-1)
39 #else
40     : fd_(-1)
41 #endif  // TIZEN
42     , channel_(NULL)
43     , watcher_id_(0)
44     , pending_request_timer_(0)
45     , pending_reply_timer_(0)
46     , is_waiting_for_reply_(false) {
47 #ifdef TIZEN
48     thread_.detach();
49 #endif  // TIZEN
50   if ((fd_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
51     ERR("Failed to create socket: %s", strerror(errno));
52     fd_ = -1;
53     return;
54   }
55
56   struct sockaddr_un server;
57   memset(&server, 0, sizeof(server));
58   server.sun_family = AF_UNIX,
59   strncpy(server.sun_path, WINTHORP_SERVER_SOCKET, sizeof(server.sun_path) - 1);
60
61   int len = SUN_LEN(&server);
62   DBG("Socket path : %s", server.sun_path + 1);
63   server.sun_path[0] = 0;
64
65   if (connect(fd_, (struct sockaddr *)&server, len)) {
66     ERR("Failed to connect to server : %s", strerror(errno));
67     close(fd_);
68     fd_ = -1;
69     return;
70   }
71
72   channel_ = g_io_channel_unix_new(fd_);
73   GIOCondition flags = GIOCondition(G_IO_IN | G_IO_ERR | G_IO_HUP);
74   watcher_id_ = g_io_add_watch(channel_, flags, IOWatchCb, this);
75
76   DBG("Connected to server");
77 }
78
79 SpeechInstance::~SpeechInstance() {
80   if (watcher_id_)
81     g_source_remove(watcher_id_);
82   if (pending_reply_timer_)
83     g_source_remove(pending_reply_timer_);
84   if (pending_request_timer_)
85     g_source_remove(pending_request_timer_);
86   if (fd_)
87     close(fd_);
88   if (channel_)
89     g_io_channel_unref(channel_);
90
91 #ifdef TIZEN
92   g_main_loop_quit(main_loop_);
93   g_main_loop_unref(main_loop_);
94 #endif  // TIZEN
95 }
96
97 // static
98 gboolean SpeechInstance::IOWatchCb(GIOChannel *c,
99                                    GIOCondition cond,
100                                    gpointer userdata) {
101   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(userdata);
102
103   (void)c;
104
105   DBG("IO Event on socket : %d", cond);
106
107   switch (cond) {
108     case G_IO_HUP:
109     case G_IO_ERR:
110       // TODO(avalluri): raise error and close the connection
111       break;
112     case G_IO_IN: {
113       char *reply = NULL;
114       uint32_t size = 0;
115
116       if (self->is_waiting_for_reply_) break;
117
118       if ((size = self->ReadReply(&reply))) {
119         self->PostMessage(reply);
120         free(reply);
121       }
122
123       break;
124     }
125     default:
126       break;
127   }
128
129   return TRUE;
130 }
131
132 bool SpeechInstance::SendRequest(const char *message) {
133   uint32_t size = ((uint32_t)strlen(message));
134   uint32_t size_be = htobe32(size);
135
136   if (fd_ == -1) {
137     ERR("Socket not connected!");
138     return false;
139   }
140
141   if (send(fd_, static_cast<void*>(&size_be), sizeof(size_be), 0) < 0) {
142     WARN("Failed to send message size: %s", strerror(errno));
143     return false;
144   }
145
146   void *buf = const_cast<void*>(static_cast<const void *>(message));
147   ssize_t len = 0;
148   while (size && (len = send(fd_, buf, size, 0)) < size) {
149     if (len < 0) {
150       WARN("Failed to send message to server: %s", strerror(errno));
151       return false;
152     }
153     size -= len;
154     buf = static_cast<char*>(buf) + len;
155   }
156
157   return true;
158 }
159
160 uint32_t SpeechInstance::ReadReply(char **reply) {
161   uint32_t size_be = 0;
162
163   if (recv(fd_, static_cast<void*>(&size_be),
164       sizeof(size_be), MSG_WAITALL) < 0) {
165     WARN("Failed read server reply: %s", strerror(errno));
166     return 0;
167   }
168
169   uint32_t size = be32toh(size_be);
170   DBG("Received message size : %u", size);
171   char *message = static_cast<char *>(malloc(size + 1));
172   memset(message, 0, size);
173
174   // FIXME: loop through till complete read
175   if (recv(fd_, message, size, MSG_WAITALL) < 0) {
176     WARN("Failed to read server message with size '%u': %s",
177         size, strerror(errno));
178     free(message);
179     return 0;
180   }
181   message[size] = '\0';
182
183   DBG("Recived message : %s", message);
184
185   if (reply) *reply = message;
186
187   return size;
188 }
189
190 // static
191 gboolean SpeechInstance::ProcessPendingRepliesCb(gpointer data) {
192   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
193
194   if (!self) {
195     WARN("asset(self)");
196     return FALSE;
197   }
198
199   std::string reply = self->pending_replies_.front();
200   self->PostMessage(reply.c_str());
201   self->pending_replies_.pop();
202
203   if (self->pending_replies_.empty()) {
204     self->pending_reply_timer_ = 0;
205     return FALSE;
206   }
207
208   return TRUE;
209 }
210
211 // static
212 gboolean SpeechInstance::ProcessPendingRequestsCb(gpointer data) {
213   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
214   if (!self) {
215     WARN("assert(self)");
216     return FALSE;
217   }
218
219   std::string &request = self->pending_requests_.front();
220   if (!self->SendRequest(request.data())) {
221     picojson::value in;
222     picojson::object out;
223
224     picojson::parse(in, request.data(), request.data() + request.size(), NULL);
225     out["reqno"] = in.get("reqno");
226     out["error"] = picojson::value("network");
227     out["message"] = picojson::value("failed to connect to server");
228
229     self->QueueReply(picojson::value(out).serialize());
230   }
231   self->pending_requests_.pop();
232
233   if (self->pending_requests_.empty()) {
234     self->pending_request_timer_ = 0;
235     return FALSE;
236   }
237
238   return TRUE;
239 }
240
241 void SpeechInstance::QueueReply(const std::string& reply) {
242   pending_replies_.push(reply);
243   if (!pending_reply_timer_) {
244     pending_reply_timer_ = g_idle_add(ProcessPendingRepliesCb,
245         static_cast<gpointer>(this));
246   }
247 }
248
249 void SpeechInstance::QueueRequest(const std::string& req) {
250   pending_requests_.push(req);
251   if (!pending_request_timer_) {
252     pending_request_timer_ = g_idle_add(ProcessPendingRequestsCb,
253         static_cast<gpointer>(this));
254   }
255 }
256
257 void SpeechInstance::HandleSyncMessage(const char* message) {
258   picojson::value v, out;
259   std::string err;
260
261   DBG("Message: %s", message);
262
263   if (!SendRequest(message)) {
264     picojson::object obj;
265
266     obj["error"] = picojson::value("network");
267     obj["message"] = picojson::value("server connection failure");
268     out = picojson::value(obj);
269   } else {
270
271     is_waiting_for_reply_ = true;
272     picojson::parse(v, message, message + strlen(message), &err);
273     if (!err.empty())
274       return;
275
276     const std::string& req_id = v.get("reqno").to_str();
277
278     do {
279       char *reply = NULL;
280       uint32_t size;
281
282       if ((size = ReadReply(&reply)) != 0) {
283         is_waiting_for_reply_ = false;
284         picojson::parse(out, reply, reply + size, &err);
285         free(reply);
286         if (!err.empty()) {
287           WARN("Failed to read server reply: %s", strerror(errno));
288           // TODO(avalluri): fill error details in out
289           break;
290         } else if (out.get("reqno").to_str() == req_id) {
291           break;
292         } else {
293           QueueReply(out.serialize());
294         }
295       }
296     } while (0);
297   }
298
299   SendSyncReply(out.serialize().c_str());
300 }
301
302 void SpeechInstance::HandleMessage(const char* message) {
303   if (!message)
304     return;
305
306   QueueRequest(message);
307 }