From 5c29e1483e0ca803c8be3ea6a0a8cfe899b2a813 Mon Sep 17 00:00:00 2001 From: John Salem Date: Wed, 5 Aug 2020 14:39:45 -0700 Subject: [PATCH] [test] fix hang in macOS variants of reverse diagnostics server tests (#40225) Co-authored-by: Noah Falk --- .../src/debug/debug-pal/unix/diagnosticsipc.cpp | 17 ++++++----- src/coreclr/src/debug/inc/diagnosticsipc.h | 7 +++-- src/coreclr/src/vm/diagnosticserver.cpp | 2 ++ src/coreclr/src/vm/ipcstreamfactory.cpp | 33 ++++++++++++++++++++++ src/tests/tracing/eventpipe/common/IpcUtils.cs | 2 +- src/tests/tracing/eventpipe/common/Reverse.cs | 22 +++++---------- .../tracing/eventpipe/reverseouter/reverseouter.cs | 10 +++++-- 7 files changed, 66 insertions(+), 27 deletions(-) diff --git a/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp b/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp index 632ac03..a718c2b 100644 --- a/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp +++ b/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp @@ -246,6 +246,8 @@ int32_t IpcStream::DiagnosticsIpc::Poll(IpcPollHandle *rgIpcPollHandles, uint32_ { if (pollfds[i].revents != 0) { + if (callback != nullptr) + callback("IpcStream::DiagnosticsIpc::Poll - poll revents", (uint32_t)pollfds[i].revents); // error check FIRST if (pollfds[i].revents & POLLHUP) { @@ -253,21 +255,22 @@ int32_t IpcStream::DiagnosticsIpc::Poll(IpcPollHandle *rgIpcPollHandles, uint32_ // will technically meet the requirements for POLLIN // i.e., a call to recv/read won't block rgIpcPollHandles[i].revents = (uint8_t)PollEvents::HANGUP; - delete[] pollfds; - return -1; } else if ((pollfds[i].revents & (POLLERR|POLLNVAL))) { if (callback != nullptr) callback("Poll error", (uint32_t)pollfds[i].revents); rgIpcPollHandles[i].revents = (uint8_t)PollEvents::ERR; - delete[] pollfds; - return -1; } else if (pollfds[i].revents & (POLLIN|POLLPRI)) { rgIpcPollHandles[i].revents = (uint8_t)PollEvents::SIGNALED; - break; + } + else + { + rgIpcPollHandles[i].revents = (uint8_t)PollEvents::UNKNOWN; + if (callback != nullptr) + callback("unkown poll response", (uint32_t)pollfds[i].revents); } } } @@ -341,7 +344,7 @@ bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nByt pfd.fd = _clientSocket; pfd.events = POLLIN; int retval = poll(&pfd, 1, timeoutMs); - if (retval <= 0 || pfd.revents != POLLIN) + if (retval <= 0 || !(pfd.revents & POLLIN)) { // timeout or error return false; @@ -382,7 +385,7 @@ bool IpcStream::Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32 pfd.fd = _clientSocket; pfd.events = POLLOUT; int retval = poll(&pfd, 1, timeoutMs); - if (retval <= 0 || pfd.revents != POLLOUT) + if (retval <= 0 || !(pfd.revents & POLLOUT)) { // timeout or error return false; diff --git a/src/coreclr/src/debug/inc/diagnosticsipc.h b/src/coreclr/src/debug/inc/diagnosticsipc.h index 99d670c..ecbf9db 100644 --- a/src/coreclr/src/debug/inc/diagnosticsipc.h +++ b/src/coreclr/src/debug/inc/diagnosticsipc.h @@ -16,6 +16,7 @@ typedef void (*ErrorCallback)(const char *szMessage, uint32_t code); class IpcStream final { + friend class IpcStreamFactory; public: static constexpr int32_t InfiniteTimeout = -1; ~IpcStream(); @@ -26,6 +27,7 @@ public: class DiagnosticsIpc final { + friend class IpcStreamFactory; public: enum ConnectionMode { @@ -38,7 +40,8 @@ public: NONE = 0x00, // no events SIGNALED = 0x01, // ready for use HANGUP = 0x02, // connection remotely closed - ERR = 0x04 // other error + ERR = 0x04, // error + UNKNOWN = 0x80 // unknown state }; // The bookeeping struct used for polling on server and client structs @@ -125,7 +128,7 @@ public: private: #ifdef TARGET_UNIX int _clientSocket = -1; - IpcStream(int clientSocket, int serverSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER) + IpcStream(int clientSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER) : _clientSocket(clientSocket), _mode(mode) {} #else HANDLE _hPipe = INVALID_HANDLE_VALUE; diff --git a/src/coreclr/src/vm/diagnosticserver.cpp b/src/coreclr/src/vm/diagnosticserver.cpp index b04def7..7688cab 100644 --- a/src/coreclr/src/vm/diagnosticserver.cpp +++ b/src/coreclr/src/vm/diagnosticserver.cpp @@ -74,6 +74,8 @@ DWORD WINAPI DiagnosticServer::DiagnosticsServerThread(LPVOID) continue; } + STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "DiagnosticServer - received IPC message with command set (%d) and command id (%d)\n", message.GetHeader().CommandSet, message.GetHeader().CommandId); + switch ((DiagnosticsIpc::DiagnosticServerCommandSet)message.GetHeader().CommandSet) { case DiagnosticsIpc::DiagnosticServerCommandSet::EventPipe: diff --git a/src/coreclr/src/vm/ipcstreamfactory.cpp b/src/coreclr/src/vm/ipcstreamfactory.cpp index d24f14c..6a82f16 100644 --- a/src/coreclr/src/vm/ipcstreamfactory.cpp +++ b/src/coreclr/src/vm/ipcstreamfactory.cpp @@ -12,8 +12,10 @@ Volatile IpcStreamFactory::s_isShutdown = false; bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback) { + STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO1000, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - ENTER.\n"); if (_pStream == nullptr) { + STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - cache was empty!\n"); // cache is empty, reconnect, e.g., there was a disconnect IpcStream *pConnection = _pIpc->Connect(callback); if (pConnection == nullptr) @@ -22,6 +24,11 @@ bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::Diagno callback("Failed to connect to client connection", -1); return false; } +#ifdef TARGET_UNIX + STRESS_LOG1(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _clientSocket = %d }\n", pConnection->_clientSocket); +#else + STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _hPipe = %d, _oOverlap.hEvent = %d }\n", pConnection->_hPipe, pConnection->_oOverlap.hEvent); +#endif if (!DiagnosticsIpc::SendIpcAdvertise_V1(pConnection)) { if (callback != nullptr) @@ -33,6 +40,7 @@ bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::Diagno _pStream = pConnection; } *pIpcPollHandle = { nullptr, _pStream, 0, this }; + STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - EXIT.\n"); return true; } @@ -139,6 +147,7 @@ int32_t IpcStreamFactory::GetNextTimeout(int32_t currentTimeoutMs) IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback) { + STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - ENTER"); IpcStream *pStream = nullptr; CQuickArrayList rgIpcPollHandles; @@ -168,6 +177,25 @@ IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback) nPollAttempts++; STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - Poll attempt: %d, timeout: %dms.\n", nPollAttempts, pollTimeoutMs); + for (uint32_t i = 0; i < rgIpcPollHandles.Size(); i++) + { + if (rgIpcPollHandles[i].pIpc != nullptr) + { +#ifdef TARGET_UNIX + STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _serverSocket = %d }\n", i, rgIpcPollHandles[i].pIpc->_serverSocket); +#else + STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pIpc->_hPipe, rgIpcPollHandles[i].pIpc->_oOverlap.hEvent); +#endif + } + else + { +#ifdef TARGET_UNIX + STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _clientSocket = %d }\n", i, rgIpcPollHandles[i].pStream->_clientSocket); +#else + STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pStream->_hPipe, rgIpcPollHandles[i].pStream->_oOverlap.hEvent); +#endif + } + } int32_t retval = IpcStream::DiagnosticsIpc::Poll(rgIpcPollHandles.Ptr(), (uint32_t)rgIpcPollHandles.Size(), pollTimeoutMs, callback); bool fSawError = false; @@ -211,6 +239,11 @@ IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback) rgIpcPollHandles.Pop(); } +#ifdef TARGET_UNIX + STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_clientSocket); +#else + STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_hPipe); +#endif return pStream; } diff --git a/src/tests/tracing/eventpipe/common/IpcUtils.cs b/src/tests/tracing/eventpipe/common/IpcUtils.cs index bf17e1e..2f39237 100644 --- a/src/tests/tracing/eventpipe/common/IpcUtils.cs +++ b/src/tests/tracing/eventpipe/common/IpcUtils.cs @@ -70,7 +70,7 @@ namespace Tracing.Tests.Common foreach ((string key, string value) in environment) process.StartInfo.Environment.Add(key, value); process.StartInfo.FileName = Process.GetCurrentProcess().MainModule.FileName; - process.StartInfo.Arguments = new Uri(currentAssembly.CodeBase).LocalPath + " 0"; + process.StartInfo.Arguments = new Uri(currentAssembly.Location).LocalPath + " 0"; process.StartInfo.RedirectStandardOutput = true; process.StartInfo.RedirectStandardInput = true; process.StartInfo.RedirectStandardError = true; diff --git a/src/tests/tracing/eventpipe/common/Reverse.cs b/src/tests/tracing/eventpipe/common/Reverse.cs index 1b99f91..19c9b21 100644 --- a/src/tests/tracing/eventpipe/common/Reverse.cs +++ b/src/tests/tracing/eventpipe/common/Reverse.cs @@ -102,7 +102,6 @@ namespace Tracing.Tests.Common socket.ReceiveBufferSize = Math.Max(bufferSize, 128); socket.Bind(remoteEP); socket.Listen(255); - socket.LingerState.Enabled = false; _server = socket; } } @@ -166,20 +165,13 @@ namespace Tracing.Tests.Common } break; case Socket socket: - try - { - socket.Shutdown(SocketShutdown.Both); - } - catch {} - finally - { - _clientSocket?.Close(); - socket.Close(); - socket.Dispose(); - _clientSocket?.Dispose(); - if (File.Exists(_serverAddress)) - File.Delete(_serverAddress); - } + if (File.Exists(_serverAddress)) + File.Delete(_serverAddress); + socket.Close(); + socket.Dispose(); + _clientSocket?.Shutdown(SocketShutdown.Both); + _clientSocket?.Close(); + _clientSocket?.Dispose(); break; default: throw new ArgumentException("Invalid server type"); diff --git a/src/tests/tracing/eventpipe/reverseouter/reverseouter.cs b/src/tests/tracing/eventpipe/reverseouter/reverseouter.cs index ff01fbc..15eadb0 100644 --- a/src/tests/tracing/eventpipe/reverseouter/reverseouter.cs +++ b/src/tests/tracing/eventpipe/reverseouter/reverseouter.cs @@ -47,16 +47,22 @@ namespace Tracing.Tests.ReverseValidation Logger.logger.Log("Starting EventPipeSession over standard connection"); using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId); Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}"); - using var source = new EventPipeEventSource(stream); - Task readerTask = Task.Run(() => source.Process()); + // using var source = new EventPipeEventSource(stream); + using var memroyStream = new MemoryStream(); + Task readerTask = stream.CopyToAsync(memroyStream);//Task.Run(() => source.Process()); await Task.Delay(500); Logger.logger.Log("Stopping EventPipeSession over standard connection"); EventPipeClient.StopTracing(pid, sessionId); await readerTask; Logger.logger.Log("Stopped EventPipeSession over standard connection"); } + catch (Exception e) + { + Logger.logger.Log(e.ToString()); + } finally { + Logger.logger.Log("setting the MRE"); mre.Set(); } }); -- 2.7.4