1 // Copyright (c) 2014 Intel Corporation. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "speech_instance.h"
12 #include <sys/types.h>
13 #include <sys/select.h>
14 #include <sys/socket.h>
20 #include "speech_logs.h"
22 #define WINTHORP_SERVER_SOCKET "@winthorpe.w3c-speech"
25 void SpeechInstance::SetupMainloop(void* data) {
26 SpeechInstance* self = reinterpret_cast<SpeechInstance*>(data);
28 DBG("Running Mainloop...");
30 g_main_loop_run(self->main_loop_);
34 SpeechInstance::SpeechInstance()
36 : main_loop_(g_main_loop_new(0, FALSE))
37 , thread_(SpeechInstance::SetupMainloop, this)
44 , pending_request_timer_(0)
45 , pending_reply_timer_(0)
46 , is_waiting_for_reply_(false) {
50 if ((fd_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
51 ERR("Failed to create socket: %s", strerror(errno));
56 struct sockaddr_un server;
57 memset(&server, 0, sizeof(server));
58 server.sun_family = AF_UNIX,
59 strncpy(server.sun_path, WINTHORP_SERVER_SOCKET, sizeof(server.sun_path) - 1);
61 int len = SUN_LEN(&server);
62 DBG("Socket path : %s", server.sun_path + 1);
63 server.sun_path[0] = 0;
65 if (connect(fd_, (struct sockaddr *)&server, len)) {
66 ERR("Failed to connect to server : %s", strerror(errno));
72 channel_ = g_io_channel_unix_new(fd_);
73 GIOCondition flags = GIOCondition(G_IO_IN | G_IO_ERR | G_IO_HUP);
74 watcher_id_ = g_io_add_watch(channel_, flags, IOWatchCb, this);
76 DBG("Connected to server");
79 SpeechInstance::~SpeechInstance() {
81 g_source_remove(watcher_id_);
82 if (pending_reply_timer_)
83 g_source_remove(pending_reply_timer_);
84 if (pending_request_timer_)
85 g_source_remove(pending_request_timer_);
89 g_io_channel_unref(channel_);
92 g_main_loop_quit(main_loop_);
93 g_main_loop_unref(main_loop_);
98 gboolean SpeechInstance::IOWatchCb(GIOChannel *c,
101 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(userdata);
105 DBG("IO Event on socket : %d", cond);
110 // TODO(avalluri): raise error and close the connection
116 if (self->is_waiting_for_reply_) break;
118 if ((size = self->ReadReply(&reply))) {
119 self->PostMessage(reply);
132 bool SpeechInstance::SendRequest(const char *message) {
133 uint32_t size = ((uint32_t)strlen(message));
134 uint32_t size_be = htobe32(size);
137 ERR("Socket not connected!");
141 if (send(fd_, static_cast<void*>(&size_be), sizeof(size_be), 0) < 0) {
142 WARN("Failed to send message size: %s", strerror(errno));
146 void *buf = const_cast<void*>(static_cast<const void *>(message));
148 while (size && (len = send(fd_, buf, size, 0)) < size) {
150 WARN("Failed to send message to server: %s", strerror(errno));
154 buf = static_cast<char*>(buf) + len;
160 uint32_t SpeechInstance::ReadReply(char **reply) {
161 uint32_t size_be = 0;
163 if (recv(fd_, static_cast<void*>(&size_be),
164 sizeof(size_be), MSG_WAITALL) < 0) {
165 WARN("Failed read server reply: %s", strerror(errno));
169 uint32_t size = be32toh(size_be);
170 DBG("Received message size : %u", size);
171 char *message = static_cast<char *>(malloc(size + 1));
172 memset(message, 0, size);
174 // FIXME: loop through till complete read
175 if (recv(fd_, message, size, MSG_WAITALL) < 0) {
176 WARN("Failed to read server message with size '%u': %s",
177 size, strerror(errno));
181 message[size] = '\0';
183 DBG("Recived message : %s", message);
185 if (reply) *reply = message;
191 gboolean SpeechInstance::ProcessPendingRepliesCb(gpointer data) {
192 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
199 std::string reply = self->pending_replies_.front();
200 self->PostMessage(reply.c_str());
201 self->pending_replies_.pop();
203 if (self->pending_replies_.empty()) {
204 self->pending_reply_timer_ = 0;
212 gboolean SpeechInstance::ProcessPendingRequestsCb(gpointer data) {
213 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
215 WARN("assert(self)");
219 std::string &request = self->pending_requests_.front();
220 if (!self->SendRequest(request.data())) {
222 picojson::object out;
224 picojson::parse(in, request.data(), request.data() + request.size(), NULL);
225 out["reqno"] = in.get("reqno");
226 out["error"] = picojson::value("network");
227 out["message"] = picojson::value("failed to connect to server");
229 self->QueueReply(picojson::value(out).serialize());
231 self->pending_requests_.pop();
233 if (self->pending_requests_.empty()) {
234 self->pending_request_timer_ = 0;
241 void SpeechInstance::QueueReply(const std::string& reply) {
242 pending_replies_.push(reply);
243 if (!pending_reply_timer_) {
244 pending_reply_timer_ = g_idle_add(ProcessPendingRepliesCb,
245 static_cast<gpointer>(this));
249 void SpeechInstance::QueueRequest(const std::string& req) {
250 pending_requests_.push(req);
251 if (!pending_request_timer_) {
252 pending_request_timer_ = g_idle_add(ProcessPendingRequestsCb,
253 static_cast<gpointer>(this));
257 void SpeechInstance::HandleSyncMessage(const char* message) {
258 picojson::value v, out;
261 DBG("Message: %s", message);
263 if (!SendRequest(message)) {
264 picojson::object obj;
266 obj["error"] = picojson::value("network");
267 obj["message"] = picojson::value("server connection failure");
268 out = picojson::value(obj);
271 is_waiting_for_reply_ = true;
272 picojson::parse(v, message, message + strlen(message), &err);
276 const std::string& req_id = v.get("reqno").to_str();
282 if ((size = ReadReply(&reply)) != 0) {
283 is_waiting_for_reply_ = false;
284 picojson::parse(out, reply, reply + size, &err);
287 WARN("Failed to read server reply: %s", strerror(errno));
288 // TODO(avalluri): fill error details in out
290 } else if (out.get("reqno").to_str() == req_id) {
293 QueueReply(out.serialize());
299 SendSyncReply(out.serialize().c_str());
302 void SpeechInstance::HandleMessage(const char* message) {
306 QueueRequest(message);