am c80353d7: am ca5d0d73: Enable clang compilation for mips. am: 08719f3bc6
[platform/upstream/VK-GL-CTS.git] / execserver / xsExecutionServer.cpp
1 /*-------------------------------------------------------------------------
2  * drawElements Quality Program Execution Server
3  * ---------------------------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Test Execution Server.
22  *//*--------------------------------------------------------------------*/
23
24 #include "xsExecutionServer.hpp"
25 #include "deClock.h"
26
27 #include <cstdio>
28
29 using std::vector;
30 using std::string;
31
32 #if 1
33 #       define DBG_PRINT(X) printf X
34 #else
35 #       define DBG_PRINT(X)
36 #endif
37
38 namespace xs
39 {
40
41 inline bool MessageBuilder::isComplete (void) const
42 {
43         if (m_buffer.size() < MESSAGE_HEADER_SIZE)
44                 return false;
45         else
46                 return m_buffer.size() == getMessageSize();
47 }
48
49 const deUint8* MessageBuilder::getMessageData (void) const
50 {
51         return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
52 }
53
54 size_t MessageBuilder::getMessageDataSize (void) const
55 {
56         DE_ASSERT(isComplete());
57         return m_buffer.size() - MESSAGE_HEADER_SIZE;
58 }
59
60 void MessageBuilder::read (ByteBuffer& src)
61 {
62         // Try to get header.
63         if (m_buffer.size() < MESSAGE_HEADER_SIZE)
64         {
65                 while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
66                            src.getNumElements() > 0)
67                         m_buffer.push_back(src.popBack());
68
69                 DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
70
71                 if (m_buffer.size() == MESSAGE_HEADER_SIZE)
72                 {
73                         // Got whole header, parse it.
74                         Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
75                 }
76         }
77
78         if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
79         {
80                 // We have header.
81                 size_t msgSize                  = getMessageSize();
82                 size_t numBytesLeft             = msgSize - m_buffer.size();
83                 size_t numToRead                = (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
84
85                 if (numToRead > 0)
86                 {
87                         int curBufPos = (int)m_buffer.size();
88                         m_buffer.resize(curBufPos+numToRead);
89                         src.popBack(&m_buffer[curBufPos], (int)numToRead);
90                 }
91         }
92 }
93
94 void MessageBuilder::clear (void)
95 {
96         m_buffer.clear();
97         m_messageType   = MESSAGETYPE_NONE;
98         m_messageSize   = 0;
99 }
100
101 ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
102         : TcpServer             (family, port)
103         , m_testDriver  (testProcess)
104         , m_runMode             (runMode)
105 {
106 }
107
108 ExecutionServer::~ExecutionServer (void)
109 {
110 }
111
112 TestDriver* ExecutionServer::acquireTestDriver (void)
113 {
114         if (!m_testDriverLock.tryLock())
115                 throw Error("Failed to acquire test driver");
116
117         return &m_testDriver;
118 }
119
120 void ExecutionServer::releaseTestDriver (TestDriver* driver)
121 {
122         DE_ASSERT(&m_testDriver == driver);
123         DE_UNREF(driver);
124         m_testDriverLock.unlock();
125 }
126
127 ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
128 {
129         printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
130         return new ExecutionRequestHandler(this, socket);
131 }
132
133 void ExecutionServer::connectionDone (ConnectionHandler* handler)
134 {
135         if (m_runMode == RUNMODE_SINGLE_EXEC)
136                 m_socket.close();
137
138         TcpServer::connectionDone(handler);
139 }
140
141 ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
142         : ConnectionHandler     (server, socket)
143         , m_execServer          (server)
144         , m_testDriver          (DE_NULL)
145         , m_bufferIn            (RECV_BUFFER_SIZE)
146         , m_bufferOut           (SEND_BUFFER_SIZE)
147         , m_run                         (false)
148         , m_sendRecvTmpBuf      (SEND_RECV_TMP_BUFFER_SIZE)
149 {
150         // Set flags.
151         m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
152
153         // Init protocol keepalives.
154         initKeepAlives();
155 }
156
157 ExecutionRequestHandler::~ExecutionRequestHandler (void)
158 {
159         if (m_testDriver)
160                 m_execServer->releaseTestDriver(m_testDriver);
161 }
162
163 void ExecutionRequestHandler::handle (void)
164 {
165         DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
166
167         try
168         {
169                 // Process execution session.
170                 processSession();
171         }
172         catch (const std::exception& e)
173         {
174                 printf("ExecutionRequestHandler::run(): %s\n", e.what());
175         }
176
177         DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
178
179         // Release test driver.
180         if (m_testDriver)
181         {
182                 try
183                 {
184                         m_testDriver->reset();
185                 }
186                 catch (...)
187                 {
188                 }
189                 m_execServer->releaseTestDriver(m_testDriver);
190                 m_testDriver = DE_NULL;
191         }
192
193         // Close connection.
194         if (m_socket->isConnected())
195                 m_socket->shutdown();
196 }
197
198 void ExecutionRequestHandler::acquireTestDriver (void)
199 {
200         DE_ASSERT(!m_testDriver);
201
202         // Try to acquire test driver - may fail.
203         m_testDriver = m_execServer->acquireTestDriver();
204         DE_ASSERT(m_testDriver);
205         m_testDriver->reset();
206
207 }
208
209 void ExecutionRequestHandler::processSession (void)
210 {
211         m_run = true;
212
213         deUint64 lastIoTime = deGetMicroseconds();
214
215         while (m_run)
216         {
217                 bool anyIO = false;
218
219                 // Read from socket to buffer.
220                 anyIO = receive() || anyIO;
221
222                 // Send bytes in buffer.
223                 anyIO = send() || anyIO;
224
225                 // Process incoming data.
226                 if (m_bufferIn.getNumElements() > 0)
227                 {
228                         DE_ASSERT(!m_msgBuilder.isComplete());
229                         m_msgBuilder.read(m_bufferIn);
230                 }
231
232                 if (m_msgBuilder.isComplete())
233                 {
234                         // Process message.
235                         processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
236
237                         m_msgBuilder.clear();
238                 }
239
240                 // Keepalives, anyone?
241                 pollKeepAlives();
242
243                 // Poll test driver for IO.
244                 if (m_testDriver)
245                         anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
246
247                 // If no IO happens in a reasonable amount of time, go to sleep.
248                 {
249                         deUint64 curTime = deGetMicroseconds();
250                         if (anyIO)
251                                 lastIoTime = curTime;
252                         else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
253                                 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
254                         else
255                                 deYield(); // Just give other threads chance to run.
256                 }
257         }
258 }
259
260 void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize)
261 {
262         switch (type)
263         {
264                 case MESSAGETYPE_HELLO:
265                 {
266                         HelloMessage msg(data, dataSize);
267                         DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
268                         if (msg.version != PROTOCOL_VERSION)
269                                 throw ProtocolError("Unsupported protocol version");
270                         break;
271                 }
272
273                 case MESSAGETYPE_TEST:
274                 {
275                         TestMessage msg(data, dataSize);
276                         DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
277                         break;
278                 }
279
280                 case MESSAGETYPE_KEEPALIVE:
281                 {
282                         KeepAliveMessage msg(data, dataSize);
283                         DBG_PRINT(("KeepAliveMessage\n"));
284                         keepAliveReceived();
285                         break;
286                 }
287
288                 case MESSAGETYPE_EXECUTE_BINARY:
289                 {
290                         ExecuteBinaryMessage msg(data, dataSize);
291                         DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292                         getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293                         keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
294                         break;
295                 }
296
297                 case MESSAGETYPE_STOP_EXECUTION:
298                 {
299                         StopExecutionMessage msg(data, dataSize);
300                         DBG_PRINT(("StopExecutionMessage\n"));
301                         getTestDriver()->stopProcess();
302                         break;
303                 }
304
305                 default:
306                         throw ProtocolError("Unsupported message");
307         }
308 }
309
310 void ExecutionRequestHandler::initKeepAlives (void)
311 {
312         deUint64 curTime = deGetMicroseconds();
313         m_lastKeepAliveSent             = curTime;
314         m_lastKeepAliveReceived = curTime;
315 }
316
317 void ExecutionRequestHandler::keepAliveReceived (void)
318 {
319         m_lastKeepAliveReceived = deGetMicroseconds();
320 }
321
322 void ExecutionRequestHandler::pollKeepAlives (void)
323 {
324         deUint64 curTime = deGetMicroseconds();
325
326         // Check that we've got keepalives in timely fashion.
327         if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
328                 throw ProtocolError("Keepalive timeout occurred");
329
330         // Send some?
331         if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
332                 m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
333         {
334                 vector<deUint8> buf;
335                 KeepAliveMessage().write(buf);
336                 m_bufferOut.pushFront(&buf[0], (int)buf.size());
337
338                 m_lastKeepAliveSent = deGetMicroseconds();
339         }
340 }
341
342 bool ExecutionRequestHandler::receive (void)
343 {
344         size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
345
346         if (maxLen > 0)
347         {
348                 size_t                  numRecv;
349                 deSocketResult  result  = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
350
351                 if (result == DE_SOCKETRESULT_SUCCESS)
352                 {
353                         DE_ASSERT(numRecv > 0);
354                         m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
355                         return true;
356                 }
357                 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
358                 {
359                         m_run = false;
360                         return true;
361                 }
362                 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
363                         return false;
364                 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365                         throw ConnectionError("Connection terminated");
366                 else
367                         throw ConnectionError("receive() failed");
368         }
369         else
370                 return false;
371 }
372
373 bool ExecutionRequestHandler::send (void)
374 {
375         size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
376
377         if (maxLen > 0)
378         {
379                 m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
380
381                 size_t                  numSent;
382                 deSocketResult  result  = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
383
384                 if (result == DE_SOCKETRESULT_SUCCESS)
385                 {
386                         DE_ASSERT(numSent > 0);
387                         m_bufferOut.popBack((int)numSent);
388                         return true;
389                 }
390                 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
391                 {
392                         m_run = false;
393                         return true;
394                 }
395                 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
396                         return false;
397                 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398                         throw ConnectionError("Connection terminated");
399                 else
400                         throw ConnectionError("send() failed");
401         }
402         else
403                 return false;
404 }
405
406 } // xs