1 /*-------------------------------------------------------------------------
2 * drawElements Quality Program Test Executor
3 * ------------------------------------------
5 * Copyright 2014 The Android Open Source Project
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 * \brief Tcp/Ip communication link.
22 *//*--------------------------------------------------------------------*/
24 #include "xeTcpIpLink.hpp"
25 #include "xsProtocol.hpp"
34 SEND_BUFFER_BLOCK_SIZE = 1024,
35 SEND_BUFFER_NUM_BLOCKS = 64
38 // Utilities for writing messages out.
40 static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
42 deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43 xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44 dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
47 static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
49 writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
53 static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
55 int nameSize = (int)strlen(name) + 1;
56 int paramsSize = (int)strlen(params) + 1;
57 int workDirSize = (int)strlen(workDir) + 1;
58 int caseListSize = (int)strlen(caseList) + 1;
59 int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
61 writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62 dst.write(nameSize, (const deUint8*)name);
63 dst.write(paramsSize, (const deUint8*)params);
64 dst.write(workDirSize, (const deUint8*)workDir);
65 dst.write(caseListSize, (const deUint8*)caseList);
69 static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
71 writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
77 TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78 : m_state (initialState)
79 , m_error (initialErr)
80 , m_lastKeepaliveReceived (0)
81 , m_stateChangedCallback (DE_NULL)
82 , m_testLogDataCallback (DE_NULL)
83 , m_infoLogDataCallback (DE_NULL)
88 TcpIpLinkState::~TcpIpLinkState (void)
92 CommLinkState TcpIpLinkState::getState (void) const
94 de::ScopedLock lock(m_lock);
99 CommLinkState TcpIpLinkState::getState (std::string& error) const
101 de::ScopedLock lock(m_lock);
107 void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
109 de::ScopedLock lock(m_lock);
111 m_stateChangedCallback = stateChangedCallback;
112 m_testLogDataCallback = testLogDataCallback;
113 m_infoLogDataCallback = infoLogDataCallback;
117 void TcpIpLinkState::setState (CommLinkState state, const char* error)
119 CommLink::StateChangedFunc callback = DE_NULL;
120 void* userPtr = DE_NULL;
123 de::ScopedLock lock(m_lock);
128 callback = m_stateChangedCallback;
133 callback(userPtr, state, error);
136 void TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const
138 CommLink::LogDataFunc callback = DE_NULL;
139 void* userPtr = DE_NULL;
142 callback = m_testLogDataCallback;
147 callback(userPtr, bytes, numBytes);
150 void TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const
152 CommLink::LogDataFunc callback = DE_NULL;
153 void* userPtr = DE_NULL;
156 callback = m_infoLogDataCallback;
161 callback(userPtr, bytes, numBytes);
164 void TcpIpLinkState::onKeepaliveReceived (void)
166 de::ScopedLock lock(m_lock);
167 m_lastKeepaliveReceived = deGetMicroseconds();
170 deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
172 de::ScopedLock lock(m_lock);
173 return m_lastKeepaliveReceived;
178 TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
181 , m_buffer (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182 , m_isRunning (false)
186 TcpIpSendThread::~TcpIpSendThread (void)
190 void TcpIpSendThread::start (void)
192 DE_ASSERT(!m_isRunning);
201 void TcpIpSendThread::run (void)
205 deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
207 while (!m_buffer.isCanceled())
209 size_t numToSend = 0;
211 deSocketResult result = DE_SOCKETRESULT_LAST;
215 // Wait for single byte and then try to read more.
216 m_buffer.read(1, &buf[0]);
217 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
219 catch (const de::BlockBuffer<deUint8>::CanceledException&)
221 // Handled in loop condition.
224 while (numSent < numToSend)
226 result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
228 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229 XE_FAIL("Connection closed");
230 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231 XE_FAIL("Connection terminated");
232 else if (result == DE_SOCKETRESULT_ERROR)
233 XE_FAIL("Socket error");
234 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
236 // \note Socket should not be in non-blocking mode.
237 DE_ASSERT(numSent == 0);
241 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
245 catch (const std::exception& e)
247 m_state.setState(COMMLINKSTATE_ERROR, e.what());
251 void TcpIpSendThread::stop (void)
263 TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
267 , m_isRunning (false)
271 TcpIpRecvThread::~TcpIpRecvThread (void)
275 void TcpIpRecvThread::start (void)
277 DE_ASSERT(!m_isRunning);
286 void TcpIpRecvThread::run (void)
292 bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293 bool hasPayload = false;
294 size_t messageSize = 0;
295 xs::MessageType messageType = (xs::MessageType)0;
299 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300 hasPayload = m_curMsgPos >= messageSize;
306 handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
311 // Try to receive missing bytes.
312 size_t curSize = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
313 size_t bytesToRecv = curSize-m_curMsgPos;
315 deSocketResult result = DE_SOCKETRESULT_LAST;
317 if (m_curMsgBuf.size() < curSize)
318 m_curMsgBuf.resize(curSize);
320 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
322 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323 XE_FAIL("Connection closed");
324 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325 XE_FAIL("Connection terminated");
326 else if (result == DE_SOCKETRESULT_ERROR)
327 XE_FAIL("Socket error");
328 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
330 // \note Socket should not be in non-blocking mode.
331 DE_ASSERT(numRecv == 0);
336 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337 DE_ASSERT(numRecv <= bytesToRecv);
338 m_curMsgPos += numRecv;
339 // Continue receiving bytes / handle message in next iter.
344 catch (const std::exception& e)
346 m_state.setState(COMMLINKSTATE_ERROR, e.what());
350 void TcpIpRecvThread::stop (void)
354 // \note Socket must be closed before terminating receive thread.
355 XE_CHECK(!m_socket.isReceiveOpen());
362 void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize)
366 case xs::MESSAGETYPE_KEEPALIVE:
367 m_state.onKeepaliveReceived();
370 case xs::MESSAGETYPE_PROCESS_STARTED:
371 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372 m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
375 case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
377 xs::ProcessLaunchFailedMessage msg(data, dataSize);
378 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
383 case xs::MESSAGETYPE_PROCESS_FINISHED:
385 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386 xs::ProcessFinishedMessage msg(data, dataSize);
387 m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388 DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
392 case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393 case xs::MESSAGETYPE_INFO:
394 // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395 if (data[dataSize-1] == 0)
398 if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
400 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401 m_state.onTestLogData(&data[0], dataSize);
404 m_state.onInfoLogData(&data[0], dataSize);
408 XE_FAIL("Unknown message");
414 TcpIpLink::TcpIpLink (void)
415 : m_state (COMMLINKSTATE_ERROR, "Not connected")
416 , m_sendThread (m_socket, m_state)
417 , m_recvThread (m_socket, m_state)
418 , m_keepaliveTimer (DE_NULL)
420 m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421 XE_CHECK(m_keepaliveTimer);
424 TcpIpLink::~TcpIpLink (void)
432 // Can't do much except to ignore error.
434 deTimer_destroy(m_keepaliveTimer);
437 void TcpIpLink::closeConnection (void)
440 deSocketState state = m_socket.getState();
441 if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
445 if (deTimer_isActive(m_keepaliveTimer))
446 deTimer_disable(m_keepaliveTimer);
448 if (m_sendThread.isRunning())
451 if (m_recvThread.isRunning())
454 if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
458 void TcpIpLink::connect (const de::SocketAddress& address)
460 XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461 XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462 XE_CHECK(!m_sendThread.isRunning());
463 XE_CHECK(!m_recvThread.isRunning());
465 m_socket.connect(address);
469 // Clear error and set state to ready.
470 m_state.setState(COMMLINKSTATE_READY, "");
471 m_state.onKeepaliveReceived();
474 m_sendThread.start();
475 m_recvThread.start();
477 XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
479 catch (const std::exception& e)
482 m_state.setState(COMMLINKSTATE_ERROR, e.what());
487 void TcpIpLink::disconnect (void)
492 m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
494 catch (const std::exception& e)
496 m_state.setState(COMMLINKSTATE_ERROR, e.what());
500 void TcpIpLink::reset (void)
502 // \note Just clears error state if we are connected.
503 if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
505 m_state.setState(COMMLINKSTATE_READY, "");
507 // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
510 disconnect(); // Abnormal state/usage. Disconnect socket.
513 void TcpIpLink::keepaliveTimerCallback (void* ptr)
515 TcpIpLink* link = static_cast<TcpIpLink*>(ptr);
516 deUint64 lastKeepalive = link->m_state.getLastKeepaliveRecevied();
517 deUint64 curTime = deGetMicroseconds();
519 // Check for timeout.
520 if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
521 link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
523 // Enqueue new keepalive.
526 writeKeepalive(link->m_sendThread.getBuffer());
528 catch (const de::BlockBuffer<deUint8>::CanceledException&)
530 // Ignore. Can happen in connection teardown.
534 CommLinkState TcpIpLink::getState (void) const
536 return m_state.getState();
539 CommLinkState TcpIpLink::getState (std::string& message) const
541 return m_state.getState(message);
544 void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
546 m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
549 void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
551 XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
553 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
554 writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
557 void TcpIpLink::stopTestProcess (void)
559 XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
560 writeStopExecution(m_sendThread.getBuffer());