1 // Copyright (c) 2014 Intel Corporation. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "speech_instance.h"
12 #include <sys/types.h>
13 #include <sys/select.h>
14 #include <sys/socket.h>
20 #include "speech_logs.h"
22 #define WINTHORP_SERVER_SOCKET "@winthorpe.w3c-speech"
25 void SpeechInstance::SetupMainloop(void* data) {
26 SpeechInstance* self = reinterpret_cast<SpeechInstance*>(data);
28 g_main_loop_run(self->main_loop_);
32 SpeechInstance::SpeechInstance()
34 : main_loop_(g_main_loop_new(0, FALSE))
35 , thread_(SpeechInstance::SetupMainloop, this)
42 , pending_request_timer_(0)
43 , pending_reply_timer_(0) {
47 if ((fd_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
48 ERR("Failed to create socket: %s", strerror(errno));
53 struct sockaddr_un server;
54 memset(&server, 0, sizeof(server));
55 server.sun_family = AF_UNIX,
56 strncpy(server.sun_path, WINTHORP_SERVER_SOCKET, sizeof(server.sun_path) - 1);
58 int len = SUN_LEN(&server);
59 DBG("Socket path : %s", server.sun_path + 1);
60 server.sun_path[0] = 0;
62 if (connect(fd_, (struct sockaddr *)&server, len)) {
63 ERR("Failed to connect to server : %s", strerror(errno));
69 channel_ = g_io_channel_unix_new(fd_);
70 GIOCondition flags = GIOCondition(G_IO_IN | G_IO_ERR | G_IO_HUP);
71 watcher_id_ = g_io_add_watch(channel_, flags, IOWatchCb, this);
73 DBG("Connected to server");
76 SpeechInstance::~SpeechInstance() {
78 g_source_remove(watcher_id_);
79 if (pending_reply_timer_)
80 g_source_remove(pending_reply_timer_);
81 if (pending_request_timer_)
82 g_source_remove(pending_request_timer_);
86 g_io_channel_unref(channel_);
89 g_main_loop_quit(main_loop_);
90 g_main_loop_unref(main_loop_);
95 gboolean SpeechInstance::IOWatchCb(GIOChannel *c,
98 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(userdata);
102 DBG("IO Event on socket : %d", cond);
107 // TODO(avalluri): raise error and close the connection
113 if ((size = self->ReadReply(&reply))) {
114 self->PostMessage(reply);
127 bool SpeechInstance::SendRequest(const char *message) {
128 uint32_t size = ((uint32_t)strlen(message));
129 uint32_t size_be = htobe32(size);
132 ERR("Socket not connected!");
136 if (send(fd_, static_cast<void*>(&size_be), sizeof(size_be), 0) < 0) {
137 WARN("Failed to send message size: %s", strerror(errno));
141 void *buf = const_cast<void*>(static_cast<const void *>(message));
143 while (size && (len = send(fd_, buf, size, 0)) < size) {
145 WARN("Failed to send message to server: %s", strerror(errno));
149 buf = static_cast<char*>(buf) + len;
155 uint32_t SpeechInstance::ReadReply(char **reply) {
156 uint32_t size_be = 0;
158 if (recv(fd_, static_cast<void*>(&size_be),
159 sizeof(size_be), MSG_WAITALL) < 0) {
160 WARN("Failed read server reply: %s", strerror(errno));
164 uint32_t size = be32toh(size_be);
165 DBG("Received message size : %u", size);
166 char *message = static_cast<char *>(malloc(size + 1));
167 memset(message, 0, size);
169 // FIXME: loop through till complete read
170 if (recv(fd_, message, size, MSG_WAITALL) < 0) {
171 WARN("Failed to read server message with size '%u': %s",
172 size, strerror(errno));
176 message[size] = '\0';
178 DBG("Recived message : %s", message);
180 if (reply) *reply = message;
186 gboolean SpeechInstance::ProcessPendingRepliesCb(gpointer data) {
187 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
194 std::string reply = self->pending_replies_.front();
195 self->PostMessage(reply.c_str());
196 self->pending_replies_.pop();
198 if (self->pending_replies_.empty()) {
199 self->pending_reply_timer_ = 0;
207 gboolean SpeechInstance::ProcessPendingRequestsCb(gpointer data) {
208 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
210 WARN("assert(self)");
214 std::string &request = self->pending_requests_.front();
215 if (!self->SendRequest(request.data())) {
217 picojson::object out;
219 picojson::parse(in, request.data(), request.data() + request.size(), NULL);
220 out["reqno"] = in.get("reqno");
221 out["error"] = picojson::value("network");
222 out["message"] = picojson::value("failed to connect to server");
224 self->QueueReply(picojson::value(out).serialize());
226 self->pending_requests_.pop();
228 if (self->pending_requests_.empty()) {
229 self->pending_request_timer_ = 0;
236 void SpeechInstance::QueueReply(const std::string& reply) {
237 pending_replies_.push(reply);
238 if (!pending_reply_timer_) {
239 pending_reply_timer_ = g_idle_add(ProcessPendingRepliesCb,
240 static_cast<gpointer>(this));
244 void SpeechInstance::QueueRequest(const std::string& req) {
245 pending_requests_.push(req);
246 if (!pending_request_timer_) {
247 pending_request_timer_ = g_idle_add(ProcessPendingRequestsCb,
248 static_cast<gpointer>(this));
252 void SpeechInstance::HandleSyncMessage(const char* message) {
253 picojson::value v, out;
256 DBG("Message: %s", message);
258 if (!SendRequest(message)) {
259 picojson::object obj;
261 obj["error"] = picojson::value("network");
262 obj["message"] = picojson::value("server connection failure");
263 out = picojson::value(obj);
265 picojson::parse(v, message, message + strlen(message), &err);
269 const std::string& req_id = v.get("reqno").to_str();
275 if ((size = ReadReply(&reply)) != 0) {
276 picojson::parse(out, reply, reply + size, &err);
279 WARN("Failed to read server reply: %s", strerror(errno));
280 // TODO(avalluri): fill error details in out
282 } else if (out.get("reqno").to_str() == req_id) {
285 QueueReply(out.serialize());
291 SendSyncReply(out.serialize().c_str());
294 void SpeechInstance::HandleMessage(const char* message) {
298 QueueRequest(message);