1 // Copyright (c) 2014 Intel Corporation. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "speech_instance.h"
12 #include <sys/types.h>
13 #include <sys/select.h>
14 #include <sys/socket.h>
20 #include "speech_logs.h"
22 #define WINTHORP_SERVER_SOCKET "@winthorpe.w3c-speech"
24 SpeechInstance::SpeechInstance()
28 , pending_request_timer_(0)
29 , pending_reply_timer_(0) {
30 if ((fd_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
31 ERR("Failed to create socket: %s", strerror(errno));
36 struct sockaddr_un server;
37 memset(&server, 0, sizeof(server));
38 server.sun_family = AF_UNIX,
39 strncpy(server.sun_path, WINTHORP_SERVER_SOCKET, sizeof(server.sun_path) - 1);
41 int len = SUN_LEN(&server);
42 DBG("Socket path : %s", server.sun_path + 1);
43 server.sun_path[0] = 0;
45 if (connect(fd_, (struct sockaddr *)&server, len)) {
46 ERR("Failed to connect to server : %s", strerror(errno));
52 channel_ = g_io_channel_unix_new(fd_);
53 GIOCondition flags = GIOCondition(G_IO_IN | G_IO_ERR | G_IO_HUP);
54 watcher_id_ = g_io_add_watch(channel_, flags, IOWatchCb, this);
56 DBG("Connected to server");
59 SpeechInstance::~SpeechInstance() {
61 g_source_remove(watcher_id_);
62 if (pending_reply_timer_)
63 g_source_remove(pending_reply_timer_);
64 if (pending_request_timer_)
65 g_source_remove(pending_request_timer_);
69 g_io_channel_unref(channel_);
73 gboolean SpeechInstance::IOWatchCb(GIOChannel *c,
76 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(userdata);
80 DBG("IO Event on socket : %d", cond);
85 // TODO(avalluri): raise error and close the connection
91 if ((size = self->ReadReply(&reply))) {
92 self->PostMessage(reply);
105 bool SpeechInstance::SendRequest(const char *message) {
106 uint32_t size = ((uint32_t)strlen(message));
107 uint32_t size_be = htobe32(size);
110 ERR("Socket not connected!");
114 if (send(fd_, static_cast<void*>(&size_be), sizeof(size_be), 0) < 0) {
115 WARN("Failed to send message size: %s", strerror(errno));
119 void *buf = const_cast<void*>(static_cast<const void *>(message));
121 while (size && (len = send(fd_, buf, size, 0)) < size) {
123 WARN("Failed to send message to server: %s", strerror(errno));
127 buf = static_cast<char*>(buf) + len;
133 uint32_t SpeechInstance::ReadReply(char **reply) {
134 uint32_t size_be = 0;
136 if (recv(fd_, static_cast<void*>(&size_be),
137 sizeof(size_be), MSG_WAITALL) < 0) {
138 WARN("Failed read server reply: %s", strerror(errno));
142 uint32_t size = be32toh(size_be);
143 DBG("Received message size : %u", size);
144 char *message = static_cast<char *>(malloc(size + 1));
145 memset(message, 0, size);
147 // FIXME: loop through till complete read
148 if (recv(fd_, message, size, MSG_WAITALL) < 0) {
149 WARN("Failed to read server message with size '%u': %s",
150 size, strerror(errno));
154 message[size] = '\0';
156 DBG("Recived message : %s", message);
158 if (reply) *reply = message;
164 gboolean SpeechInstance::ProcessPendingRepliesCb(gpointer data) {
165 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
172 std::string reply = self->pending_replies_.front();
173 self->PostMessage(reply.c_str());
174 self->pending_replies_.pop();
176 if (self->pending_replies_.empty()) {
177 self->pending_reply_timer_ = 0;
185 gboolean SpeechInstance::ProcessPendingRequestsCb(gpointer data) {
186 SpeechInstance *self = reinterpret_cast<SpeechInstance*>(data);
188 WARN("assert(self)");
192 std::string &request = self->pending_requests_.front();
193 if (!self->SendRequest(request.data())) {
195 picojson::object out;
197 picojson::parse(in, request.data(), request.data() + request.size(), NULL);
198 out["reqno"] = in.get("reqno");
199 out["error"] = picojson::value("network");
200 out["message"] = picojson::value("failed to connect to server");
202 self->QueueReply(picojson::value(out).serialize());
204 self->pending_requests_.pop();
206 if (self->pending_requests_.empty()) {
207 self->pending_request_timer_ = 0;
214 void SpeechInstance::QueueReply(const std::string& reply) {
215 pending_replies_.push(reply);
216 if (!pending_reply_timer_) {
217 pending_reply_timer_ = g_idle_add(ProcessPendingRepliesCb,
218 static_cast<gpointer>(this));
222 void SpeechInstance::QueueRequest(const std::string& req) {
223 pending_requests_.push(req);
224 if (!pending_request_timer_) {
225 pending_request_timer_ = g_idle_add(ProcessPendingRequestsCb,
226 static_cast<gpointer>(this));
230 void SpeechInstance::HandleSyncMessage(const char* message) {
231 picojson::value v, out;
234 DBG("Message: %s", message);
236 if (!SendRequest(message)) {
237 picojson::object obj;
239 obj["error"] = picojson::value("network");
240 obj["message"] = picojson::value("server connection failure");
241 out = picojson::value(obj);
243 picojson::parse(v, message, message + strlen(message), &err);
247 const std::string& req_id = v.get("reqno").to_str();
253 if ((size = ReadReply(&reply)) != 0) {
254 picojson::parse(out, reply, reply + size, &err);
257 WARN("Failed to read server reply: %s", strerror(errno));
258 // TODO(avalluri): fill error details in out
260 } else if (out.get("reqno").to_str() == req_id) {
263 QueueReply(out.serialize());
269 SendSyncReply(out.serialize().c_str());
272 void SpeechInstance::HandleMessage(const char* message) {
276 QueueRequest(message);