65424c443a2079049790020c6ef72b2b53d772a9
[profile/ivi/speech-recognition.git] / src / plugins / client-api / w3c-speech / crosswalk-extension / speech_instance.cc
1 // Copyright (c) 2014 Intel Corporation. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "speech_instance.h"
6
7 #include <gio/gio.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/types.h>
13 #include <sys/select.h>
14 #include <sys/socket.h>
15 #include <sys/un.h>
16 #include <unistd.h>
17
18 #include <string>
19
20 #include "speech_logs.h"
21
22 #define WINTHORP_SERVER_SOCKET "@winthorpe.w3c-speech"
23
24 SpeechInstance::SpeechInstance()
25    : fd_(-1)
26     , channel_(NULL)
27     , watcher_id_(0)
28     , pending_request_timer_(0)
29     , pending_reply_timer_(0) {
30   if ((fd_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
31     ERR("Failed to create socket: %s", strerror(errno));
32     fd_ = -1;
33     return;
34   }
35
36   struct sockaddr_un server;
37   memset(&server, 0, sizeof(server));
38   server.sun_family = AF_UNIX,
39   strncpy(server.sun_path, WINTHORP_SERVER_SOCKET, sizeof(server.sun_path) - 1);
40
41   int len = SUN_LEN(&server);
42   DBG("Socket path : %s", server.sun_path + 1);
43   server.sun_path[0] = 0;
44
45   if (connect(fd_, (struct sockaddr *)&server, len)) {
46     ERR("Failed to connect to server : %s", strerror(errno));
47     close(fd_);
48     fd_ = -1;
49     return;
50   }
51
52   channel_ = g_io_channel_unix_new(fd_);
53   GIOCondition flags = GIOCondition(G_IO_IN | G_IO_ERR | G_IO_HUP);
54   watcher_id_ = g_io_add_watch(channel_, flags, IOWatchCb, this);
55
56   DBG("Connected to server");
57 }
58
59 SpeechInstance::~SpeechInstance() {
60   if (watcher_id_)
61     g_source_remove(watcher_id_);
62   if (pending_reply_timer_)
63     g_source_remove(pending_reply_timer_);
64   if (pending_request_timer_)
65     g_source_remove(pending_request_timer_);
66   if (fd_)
67     close(fd_);
68   if (channel_)
69     g_io_channel_unref(channel_);
70 }
71
72 // static
73 gboolean SpeechInstance::IOWatchCb(GIOChannel *c,
74                                    GIOCondition cond,
75                                    gpointer userdata) {
76   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(userdata);
77
78   (void)c;
79
80   DBG("IO Event on socket : %d", cond);
81
82   switch (cond) {
83     case G_IO_HUP:
84     case G_IO_ERR:
85       // TODO(avalluri): raise error and close the connection
86       break;
87     case G_IO_IN: {
88       char *reply = NULL;
89       uint32_t size = 0;
90
91       if ((size = self->ReadReply(&reply))) {
92         self->PostMessage(reply);
93         free(reply);
94       }
95
96       break;
97     }
98     default:
99       break;
100   }
101
102   return TRUE;
103 }
104
105 bool SpeechInstance::SendRequest(const char *message) {
106   uint32_t size = ((uint32_t)strlen(message));
107   uint32_t size_be = htobe32(size);
108
109   if (fd_ == -1) {
110     ERR("Socket not connected!");
111     return false;
112   }
113
114   if (send(fd_, static_cast<void*>(&size_be), sizeof(size_be), 0) < 0) {
115     WARN("Failed to send message size: %s", strerror(errno));
116     return false;
117   }
118
119   void *buf = const_cast<void*>(static_cast<const void *>(message));
120   ssize_t len = 0;
121   while (size && (len = send(fd_, buf, size, 0)) < size) {
122     if (len < 0) {
123       WARN("Failed to send message to server: %s", strerror(errno));
124       return false;
125     }
126     size -= len;
127     buf = static_cast<char*>(buf) + len;
128   }
129
130   return true;
131 }
132
133 uint32_t SpeechInstance::ReadReply(char **reply) {
134   uint32_t size_be = 0;
135
136   if (recv(fd_, static_cast<void*>(&size_be),
137       sizeof(size_be), MSG_WAITALL) < 0) {
138     WARN("Failed read server reply: %s", strerror(errno));
139     return 0;
140   }
141
142   uint32_t size = be32toh(size_be);
143   DBG("Received message size : %u", size);
144   char *message = static_cast<char *>(malloc(size + 1));
145   memset(message, 0, size);
146
147   // FIXME: loop through till complete read
148   if (recv(fd_, message, size, MSG_WAITALL) < 0) {
149     WARN("Failed to read server message with size '%u': %s",
150         size, strerror(errno));
151     free(message);
152     return 0;
153   }
154   message[size] = '\0';
155
156   DBG("Recived message : %s", message);
157
158   if (reply) *reply = message;
159
160   return size;
161 }
162
163 // static
164 gboolean SpeechInstance::ProcessPendingRepliesCb(gpointer data) {
165   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
166
167   if (!self) {
168     WARN("asset(self)");
169     return FALSE;
170   }
171
172   std::string reply = self->pending_replies_.front();
173   self->PostMessage(reply.c_str());
174   self->pending_replies_.pop();
175
176   if (self->pending_replies_.empty()) {
177     self->pending_reply_timer_ = 0;
178     return FALSE;
179   }
180
181   return TRUE;
182 }
183
184 // static
185 gboolean SpeechInstance::ProcessPendingRequestsCb(gpointer data) {
186   SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
187   if (!self) {
188     WARN("assert(self)");
189     return FALSE;
190   }
191
192   std::string &request = self->pending_requests_.front();
193   if (!self->SendRequest(request.data())) {
194     picojson::value in;
195     picojson::object out;
196
197     picojson::parse(in, request.data(), request.data() + request.size(), NULL);
198     out["reqno"] = in.get("reqno");
199     out["error"] = picojson::value("network");
200     out["message"] = picojson::value("failed to connect to server");
201
202     self->QueueReply(picojson::value(out).serialize());
203   }
204   self->pending_requests_.pop();
205
206   if (self->pending_requests_.empty()) {
207     self->pending_request_timer_ = 0;
208     return FALSE;
209   }
210
211   return TRUE;
212 }
213
214 void SpeechInstance::QueueReply(const std::string& reply) {
215   pending_replies_.push(reply);
216   if (!pending_reply_timer_) {
217     pending_reply_timer_ = g_idle_add(ProcessPendingRepliesCb,
218         static_cast<gpointer>(this));
219   }
220 }
221
222 void SpeechInstance::QueueRequest(const std::string& req) {
223   pending_requests_.push(req);
224   if (!pending_request_timer_) {
225     pending_request_timer_ = g_idle_add(ProcessPendingRequestsCb,
226         static_cast<gpointer>(this));
227   }
228 }
229
230 void SpeechInstance::HandleSyncMessage(const char* message) {
231   picojson::value v, out;
232   std::string err;
233
234   DBG("Message: %s", message);
235
236   if (!SendRequest(message)) {
237     picojson::object obj;
238
239     obj["error"] = picojson::value("network");
240     obj["message"] = picojson::value("server connection failure");
241     out = picojson::value(obj);
242   } else {
243     picojson::parse(v, message, message + strlen(message), &err);
244     if (!err.empty())
245       return;
246
247     const std::string& req_id = v.get("reqno").to_str();
248
249     do {
250       char *reply = NULL;
251       uint32_t size;
252
253       if ((size = ReadReply(&reply)) != 0) {
254         picojson::parse(out, reply, reply + size, &err);
255         free(reply);
256         if (!err.empty()) {
257           WARN("Failed to read server reply: %s", strerror(errno));
258           // TODO(avalluri): fill error details in out
259           break;
260         } else if (out.get("reqno").to_str() == req_id) {
261           break;
262         } else {
263           QueueReply(out.serialize());
264         }
265       }
266     } while (0);
267   }
268
269   SendSyncReply(out.serialize().c_str());
270 }
271
272 void SpeechInstance::HandleMessage(const char* message) {
273   if (!message)
274     return;
275
276   QueueRequest(message);
277 }