am fef7d7ab: (-s ours) am 48c21c9a: (-s ours) am 0870de9a: DO NOT MERGE Do not requir...
[platform/upstream/VK-GL-CTS.git] / executor / xeTcpIpLink.cpp
1 /*-------------------------------------------------------------------------
2  * drawElements Quality Program Test Executor
3  * ------------------------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Tcp/Ip communication link.
22  *//*--------------------------------------------------------------------*/
23
24 #include "xeTcpIpLink.hpp"
25 #include "xsProtocol.hpp"
26 #include "deClock.h"
27 #include "deInt32.h"
28
29 namespace xe
30 {
31
32 enum
33 {
34         SEND_BUFFER_BLOCK_SIZE          = 1024,
35         SEND_BUFFER_NUM_BLOCKS          = 64
36 };
37
38 // Utilities for writing messages out.
39
40 static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
41 {
42         deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43         xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44         dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45 }
46
47 static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
48 {
49         writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50         dst.flush();
51 }
52
53 static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
54 {
55         int             nameSize                        = (int)strlen(name)             + 1;
56         int             paramsSize                      = (int)strlen(params)   + 1;
57         int             workDirSize                     = (int)strlen(workDir)  + 1;
58         int             caseListSize            = (int)strlen(caseList) + 1;
59         int             totalSize                       = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
60
61         writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62         dst.write(nameSize,             (const deUint8*)name);
63         dst.write(paramsSize,   (const deUint8*)params);
64         dst.write(workDirSize,  (const deUint8*)workDir);
65         dst.write(caseListSize, (const deUint8*)caseList);
66         dst.flush();
67 }
68
69 static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
70 {
71         writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
72         dst.flush();
73 }
74
75 // TcpIpLinkState
76
77 TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78         : m_state                                       (initialState)
79         , m_error                                       (initialErr)
80         , m_lastKeepaliveReceived       (0)
81         , m_stateChangedCallback        (DE_NULL)
82         , m_testLogDataCallback         (DE_NULL)
83         , m_infoLogDataCallback         (DE_NULL)
84         , m_userPtr                                     (DE_NULL)
85 {
86 }
87
88 TcpIpLinkState::~TcpIpLinkState (void)
89 {
90 }
91
92 CommLinkState TcpIpLinkState::getState (void) const
93 {
94         de::ScopedLock lock(m_lock);
95
96         return m_state;
97 }
98
99 CommLinkState TcpIpLinkState::getState (std::string& error) const
100 {
101         de::ScopedLock lock(m_lock);
102
103         error = m_error;
104         return m_state;
105 }
106
107 void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
108 {
109         de::ScopedLock lock(m_lock);
110
111         m_stateChangedCallback          = stateChangedCallback;
112         m_testLogDataCallback           = testLogDataCallback;
113         m_infoLogDataCallback           = infoLogDataCallback;
114         m_userPtr                                       = userPtr;
115 }
116
117 void TcpIpLinkState::setState (CommLinkState state, const char* error)
118 {
119         CommLink::StateChangedFunc      callback        = DE_NULL;
120         void*                                           userPtr         = DE_NULL;
121
122         {
123                 de::ScopedLock lock(m_lock);
124
125                 m_state = state;
126                 m_error = error;
127
128                 callback        = m_stateChangedCallback;
129                 userPtr         = m_userPtr;
130         }
131
132         if (callback)
133                 callback(userPtr, state, error);
134 }
135
136 void TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const
137 {
138         CommLink::LogDataFunc   callback        = DE_NULL;
139         void*                                   userPtr         = DE_NULL;
140
141         m_lock.lock();
142         callback        = m_testLogDataCallback;
143         userPtr         = m_userPtr;
144         m_lock.unlock();
145
146         if (callback)
147                 callback(userPtr, bytes, numBytes);
148 }
149
150 void TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const
151 {
152         CommLink::LogDataFunc   callback        = DE_NULL;
153         void*                                   userPtr         = DE_NULL;
154
155         m_lock.lock();
156         callback        = m_infoLogDataCallback;
157         userPtr         = m_userPtr;
158         m_lock.unlock();
159
160         if (callback)
161                 callback(userPtr, bytes, numBytes);
162 }
163
164 void TcpIpLinkState::onKeepaliveReceived (void)
165 {
166         de::ScopedLock lock(m_lock);
167         m_lastKeepaliveReceived = deGetMicroseconds();
168 }
169
170 deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
171 {
172         de::ScopedLock lock(m_lock);
173         return m_lastKeepaliveReceived;
174 }
175
176 // TcpIpSendThread
177
178 TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
179         : m_socket              (socket)
180         , m_state               (state)
181         , m_buffer              (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182         , m_isRunning   (false)
183 {
184 }
185
186 TcpIpSendThread::~TcpIpSendThread (void)
187 {
188 }
189
190 void TcpIpSendThread::start (void)
191 {
192         DE_ASSERT(!m_isRunning);
193
194         // Reset state.
195         m_buffer.clear();
196         m_isRunning = true;
197
198         de::Thread::start();
199 }
200
201 void TcpIpSendThread::run (void)
202 {
203         try
204         {
205                 deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
206
207                 while (!m_buffer.isCanceled())
208                 {
209                         size_t                  numToSend       = 0;
210                         size_t                  numSent         = 0;
211                         deSocketResult  result          = DE_SOCKETRESULT_LAST;
212
213                         try
214                         {
215                                 // Wait for single byte and then try to read more.
216                                 m_buffer.read(1, &buf[0]);
217                                 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
218                         }
219                         catch (const de::BlockBuffer<deUint8>::CanceledException&)
220                         {
221                                 // Handled in loop condition.
222                         }
223
224                         while (numSent < numToSend)
225                         {
226                                 result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
227
228                                 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229                                         XE_FAIL("Connection closed");
230                                 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231                                         XE_FAIL("Connection terminated");
232                                 else if (result == DE_SOCKETRESULT_ERROR)
233                                         XE_FAIL("Socket error");
234                                 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
235                                 {
236                                         // \note Socket should not be in non-blocking mode.
237                                         DE_ASSERT(numSent == 0);
238                                         deYield();
239                                 }
240                                 else
241                                         DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
242                         }
243                 }
244         }
245         catch (const std::exception& e)
246         {
247                 m_state.setState(COMMLINKSTATE_ERROR, e.what());
248         }
249 }
250
251 void TcpIpSendThread::stop (void)
252 {
253         if (m_isRunning)
254         {
255                 m_buffer.cancel();
256                 join();
257                 m_isRunning = false;
258         }
259 }
260
261 // TcpIpRecvThread
262
263 TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
264         : m_socket              (socket)
265         , m_state               (state)
266         , m_curMsgPos   (0)
267         , m_isRunning   (false)
268 {
269 }
270
271 TcpIpRecvThread::~TcpIpRecvThread (void)
272 {
273 }
274
275 void TcpIpRecvThread::start (void)
276 {
277         DE_ASSERT(!m_isRunning);
278
279         // Reset state.
280         m_curMsgPos = 0;
281         m_isRunning = true;
282
283         de::Thread::start();
284 }
285
286 void TcpIpRecvThread::run (void)
287 {
288         try
289         {
290                 for (;;)
291                 {
292                         bool                            hasHeader               = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293                         bool                            hasPayload              = false;
294                         size_t                          messageSize             = 0;
295                         xs::MessageType         messageType             = (xs::MessageType)0;
296
297                         if (hasHeader)
298                         {
299                                 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300                                 hasPayload = m_curMsgPos >= messageSize;
301                         }
302
303                         if (hasPayload)
304                         {
305                                 // Process message.
306                                 handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
307                                 m_curMsgPos = 0;
308                         }
309                         else
310                         {
311                                 // Try to receive missing bytes.
312                                 size_t                          curSize                 = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
313                                 size_t                          bytesToRecv             = curSize-m_curMsgPos;
314                                 size_t                          numRecv                 = 0;
315                                 deSocketResult          result                  = DE_SOCKETRESULT_LAST;
316
317                                 if (m_curMsgBuf.size() < curSize)
318                                         m_curMsgBuf.resize(curSize);
319
320                                 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
321
322                                 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323                                         XE_FAIL("Connection closed");
324                                 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325                                         XE_FAIL("Connection terminated");
326                                 else if (result == DE_SOCKETRESULT_ERROR)
327                                         XE_FAIL("Socket error");
328                                 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
329                                 {
330                                         // \note Socket should not be in non-blocking mode.
331                                         DE_ASSERT(numRecv == 0);
332                                         deYield();
333                                 }
334                                 else
335                                 {
336                                         DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337                                         DE_ASSERT(numRecv <= bytesToRecv);
338                                         m_curMsgPos += numRecv;
339                                         // Continue receiving bytes / handle message in next iter.
340                                 }
341                         }
342                 }
343         }
344         catch (const std::exception& e)
345         {
346                 m_state.setState(COMMLINKSTATE_ERROR, e.what());
347         }
348 }
349
350 void TcpIpRecvThread::stop (void)
351 {
352         if (m_isRunning)
353         {
354                 // \note Socket must be closed before terminating receive thread.
355                 XE_CHECK(!m_socket.isReceiveOpen());
356
357                 join();
358                 m_isRunning = false;
359         }
360 }
361
362 void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize)
363 {
364         switch (messageType)
365         {
366                 case xs::MESSAGETYPE_KEEPALIVE:
367                         m_state.onKeepaliveReceived();
368                         break;
369
370                 case xs::MESSAGETYPE_PROCESS_STARTED:
371                         XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372                         m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
373                         break;
374
375                 case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
376                 {
377                         xs::ProcessLaunchFailedMessage msg(data, dataSize);
378                         XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379                         m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
380                         break;
381                 }
382
383                 case xs::MESSAGETYPE_PROCESS_FINISHED:
384                 {
385                         XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386                         xs::ProcessFinishedMessage msg(data, dataSize);
387                         m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388                         DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
389                         break;
390                 }
391
392                 case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393                 case xs::MESSAGETYPE_INFO:
394                         // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395                         if (data[dataSize-1] == 0)
396                                 dataSize -= 1;
397
398                         if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
399                         {
400                                 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401                                 m_state.onTestLogData(&data[0], dataSize);
402                         }
403                         else
404                                 m_state.onInfoLogData(&data[0], dataSize);
405                         break;
406
407                 default:
408                         XE_FAIL("Unknown message");
409         }
410 }
411
412 // TcpIpLink
413
414 TcpIpLink::TcpIpLink (void)
415         : m_state                       (COMMLINKSTATE_ERROR, "Not connected")
416         , m_sendThread          (m_socket, m_state)
417         , m_recvThread          (m_socket, m_state)
418         , m_keepaliveTimer      (DE_NULL)
419 {
420         m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421         XE_CHECK(m_keepaliveTimer);
422 }
423
424 TcpIpLink::~TcpIpLink (void)
425 {
426         try
427         {
428                 closeConnection();
429         }
430         catch (...)
431         {
432                 // Can't do much except to ignore error.
433         }
434         deTimer_destroy(m_keepaliveTimer);
435 }
436
437 void TcpIpLink::closeConnection (void)
438 {
439         {
440                 deSocketState state = m_socket.getState();
441                 if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
442                         m_socket.shutdown();
443         }
444
445         if (deTimer_isActive(m_keepaliveTimer))
446                 deTimer_disable(m_keepaliveTimer);
447
448         if (m_sendThread.isRunning())
449                 m_sendThread.stop();
450
451         if (m_recvThread.isRunning())
452                 m_recvThread.stop();
453
454         if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
455                 m_socket.close();
456 }
457
458 void TcpIpLink::connect (const de::SocketAddress& address)
459 {
460         XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461         XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462         XE_CHECK(!m_sendThread.isRunning());
463         XE_CHECK(!m_recvThread.isRunning());
464
465         m_socket.connect(address);
466
467         try
468         {
469                 // Clear error and set state to ready.
470                 m_state.setState(COMMLINKSTATE_READY, "");
471                 m_state.onKeepaliveReceived();
472
473                 // Launch threads.
474                 m_sendThread.start();
475                 m_recvThread.start();
476
477                 XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
478         }
479         catch (const std::exception& e)
480         {
481                 closeConnection();
482                 m_state.setState(COMMLINKSTATE_ERROR, e.what());
483                 throw;
484         }
485 }
486
487 void TcpIpLink::disconnect (void)
488 {
489         try
490         {
491                 closeConnection();
492                 m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
493         }
494         catch (const std::exception& e)
495         {
496                 m_state.setState(COMMLINKSTATE_ERROR, e.what());
497         }
498 }
499
500 void TcpIpLink::reset (void)
501 {
502         // \note Just clears error state if we are connected.
503         if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
504         {
505                 m_state.setState(COMMLINKSTATE_READY, "");
506
507                 // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
508         }
509         else
510                 disconnect(); // Abnormal state/usage. Disconnect socket.
511 }
512
513 void TcpIpLink::keepaliveTimerCallback (void* ptr)
514 {
515         TcpIpLink*      link                    = static_cast<TcpIpLink*>(ptr);
516         deUint64        lastKeepalive   = link->m_state.getLastKeepaliveRecevied();
517         deUint64        curTime                 = deGetMicroseconds();
518
519         // Check for timeout.
520         if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
521                 link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
522
523         // Enqueue new keepalive.
524         try
525         {
526                 writeKeepalive(link->m_sendThread.getBuffer());
527         }
528         catch (const de::BlockBuffer<deUint8>::CanceledException&)
529         {
530                 // Ignore. Can happen in connection teardown.
531         }
532 }
533
534 CommLinkState TcpIpLink::getState (void) const
535 {
536         return m_state.getState();
537 }
538
539 CommLinkState TcpIpLink::getState (std::string& message) const
540 {
541         return m_state.getState(message);
542 }
543
544 void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
545 {
546         m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
547 }
548
549 void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
550 {
551         XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
552
553         m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
554         writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
555 }
556
557 void TcpIpLink::stopTestProcess (void)
558 {
559         XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
560         writeStopExecution(m_sendThread.getBuffer());
561 }
562
563 } // xe