1 /*-------------------------------------------------------------------------
2 * drawElements Quality Program Execution Server
3 * ---------------------------------------------
5 * Copyright 2014 The Android Open Source Project
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 * \brief Test Execution Server.
22 *//*--------------------------------------------------------------------*/
24 #include "xsExecutionServer.hpp"
33 # define DBG_PRINT(X) printf X
41 inline bool MessageBuilder::isComplete (void) const
43 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
46 return m_buffer.size() == getMessageSize();
49 const deUint8* MessageBuilder::getMessageData (void) const
51 return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
54 size_t MessageBuilder::getMessageDataSize (void) const
56 DE_ASSERT(isComplete());
57 return m_buffer.size() - MESSAGE_HEADER_SIZE;
60 void MessageBuilder::read (ByteBuffer& src)
63 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
65 while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
66 src.getNumElements() > 0)
67 m_buffer.push_back(src.popBack());
69 DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
71 if (m_buffer.size() == MESSAGE_HEADER_SIZE)
73 // Got whole header, parse it.
74 Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
78 if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
81 size_t msgSize = getMessageSize();
82 size_t numBytesLeft = msgSize - m_buffer.size();
83 size_t numToRead = (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
87 int curBufPos = (int)m_buffer.size();
88 m_buffer.resize(curBufPos+numToRead);
89 src.popBack(&m_buffer[curBufPos], (int)numToRead);
94 void MessageBuilder::clear (void)
97 m_messageType = MESSAGETYPE_NONE;
101 ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
102 : TcpServer (family, port)
103 , m_testDriver (testProcess)
104 , m_runMode (runMode)
108 ExecutionServer::~ExecutionServer (void)
112 TestDriver* ExecutionServer::acquireTestDriver (void)
114 if (!m_testDriverLock.tryLock())
115 throw Error("Failed to acquire test driver");
117 return &m_testDriver;
120 void ExecutionServer::releaseTestDriver (TestDriver* driver)
122 DE_ASSERT(&m_testDriver == driver);
124 m_testDriverLock.unlock();
127 ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
129 printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
130 return new ExecutionRequestHandler(this, socket);
133 void ExecutionServer::connectionDone (ConnectionHandler* handler)
135 if (m_runMode == RUNMODE_SINGLE_EXEC)
138 TcpServer::connectionDone(handler);
141 ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
142 : ConnectionHandler (server, socket)
143 , m_execServer (server)
144 , m_testDriver (DE_NULL)
145 , m_bufferIn (RECV_BUFFER_SIZE)
146 , m_bufferOut (SEND_BUFFER_SIZE)
148 , m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE)
151 m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
153 // Init protocol keepalives.
157 ExecutionRequestHandler::~ExecutionRequestHandler (void)
160 m_execServer->releaseTestDriver(m_testDriver);
163 void ExecutionRequestHandler::handle (void)
165 DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
169 // Process execution session.
172 catch (const std::exception& e)
174 printf("ExecutionRequestHandler::run(): %s\n", e.what());
177 DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
179 // Release test driver.
184 m_testDriver->reset();
189 m_execServer->releaseTestDriver(m_testDriver);
190 m_testDriver = DE_NULL;
194 if (m_socket->isConnected())
195 m_socket->shutdown();
198 void ExecutionRequestHandler::acquireTestDriver (void)
200 DE_ASSERT(!m_testDriver);
202 // Try to acquire test driver - may fail.
203 m_testDriver = m_execServer->acquireTestDriver();
204 DE_ASSERT(m_testDriver);
205 m_testDriver->reset();
209 void ExecutionRequestHandler::processSession (void)
213 deUint64 lastIoTime = deGetMicroseconds();
219 // Read from socket to buffer.
220 anyIO = receive() || anyIO;
222 // Send bytes in buffer.
223 anyIO = send() || anyIO;
225 // Process incoming data.
226 if (m_bufferIn.getNumElements() > 0)
228 DE_ASSERT(!m_msgBuilder.isComplete());
229 m_msgBuilder.read(m_bufferIn);
232 if (m_msgBuilder.isComplete())
235 processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
237 m_msgBuilder.clear();
240 // Keepalives, anyone?
243 // Poll test driver for IO.
245 anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
247 // If no IO happens in a reasonable amount of time, go to sleep.
249 deUint64 curTime = deGetMicroseconds();
251 lastIoTime = curTime;
252 else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
253 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
255 deYield(); // Just give other threads chance to run.
260 void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize)
264 case MESSAGETYPE_HELLO:
266 HelloMessage msg(data, dataSize);
267 DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
268 if (msg.version != PROTOCOL_VERSION)
269 throw ProtocolError("Unsupported protocol version");
273 case MESSAGETYPE_TEST:
275 TestMessage msg(data, dataSize);
276 DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
280 case MESSAGETYPE_KEEPALIVE:
282 KeepAliveMessage msg(data, dataSize);
283 DBG_PRINT(("KeepAliveMessage\n"));
288 case MESSAGETYPE_EXECUTE_BINARY:
290 ExecuteBinaryMessage msg(data, dataSize);
291 DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292 getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293 keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
297 case MESSAGETYPE_STOP_EXECUTION:
299 StopExecutionMessage msg(data, dataSize);
300 DBG_PRINT(("StopExecutionMessage\n"));
301 getTestDriver()->stopProcess();
306 throw ProtocolError("Unsupported message");
310 void ExecutionRequestHandler::initKeepAlives (void)
312 deUint64 curTime = deGetMicroseconds();
313 m_lastKeepAliveSent = curTime;
314 m_lastKeepAliveReceived = curTime;
317 void ExecutionRequestHandler::keepAliveReceived (void)
319 m_lastKeepAliveReceived = deGetMicroseconds();
322 void ExecutionRequestHandler::pollKeepAlives (void)
324 deUint64 curTime = deGetMicroseconds();
326 // Check that we've got keepalives in timely fashion.
327 if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
328 throw ProtocolError("Keepalive timeout occurred");
331 if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
332 m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
335 KeepAliveMessage().write(buf);
336 m_bufferOut.pushFront(&buf[0], (int)buf.size());
338 m_lastKeepAliveSent = deGetMicroseconds();
342 bool ExecutionRequestHandler::receive (void)
344 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
349 deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
351 if (result == DE_SOCKETRESULT_SUCCESS)
353 DE_ASSERT(numRecv > 0);
354 m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
357 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
362 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
364 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365 throw ConnectionError("Connection terminated");
367 throw ConnectionError("receive() failed");
373 bool ExecutionRequestHandler::send (void)
375 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
379 m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
382 deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
384 if (result == DE_SOCKETRESULT_SUCCESS)
386 DE_ASSERT(numSent > 0);
387 m_bufferOut.popBack((int)numSent);
390 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
395 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
397 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398 throw ConnectionError("Connection terminated");
400 throw ConnectionError("send() failed");