[test] fix hang in macOS variants of reverse diagnostics server tests (#40225)
authorJohn Salem <josalem@microsoft.com>
Wed, 5 Aug 2020 21:39:45 +0000 (14:39 -0700)
committerGitHub <noreply@github.com>
Wed, 5 Aug 2020 21:39:45 +0000 (14:39 -0700)
Co-authored-by: Noah Falk <noahfalk@users.noreply.github.com>
src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp
src/coreclr/src/debug/inc/diagnosticsipc.h
src/coreclr/src/vm/diagnosticserver.cpp
src/coreclr/src/vm/ipcstreamfactory.cpp
src/tests/tracing/eventpipe/common/IpcUtils.cs
src/tests/tracing/eventpipe/common/Reverse.cs
src/tests/tracing/eventpipe/reverseouter/reverseouter.cs

index 632ac03..a718c2b 100644 (file)
@@ -246,6 +246,8 @@ int32_t IpcStream::DiagnosticsIpc::Poll(IpcPollHandle *rgIpcPollHandles, uint32_
     {
         if (pollfds[i].revents != 0)
         {
+            if (callback != nullptr)
+                callback("IpcStream::DiagnosticsIpc::Poll - poll revents", (uint32_t)pollfds[i].revents);
             // error check FIRST
             if (pollfds[i].revents & POLLHUP)
             {
@@ -253,21 +255,22 @@ int32_t IpcStream::DiagnosticsIpc::Poll(IpcPollHandle *rgIpcPollHandles, uint32_
                 // will technically meet the requirements for POLLIN
                 // i.e., a call to recv/read won't block
                 rgIpcPollHandles[i].revents = (uint8_t)PollEvents::HANGUP;
-                delete[] pollfds;
-                return -1;
             }
             else if ((pollfds[i].revents & (POLLERR|POLLNVAL)))
             {
                 if (callback != nullptr)
                     callback("Poll error", (uint32_t)pollfds[i].revents);
                 rgIpcPollHandles[i].revents = (uint8_t)PollEvents::ERR;
-                delete[] pollfds;
-                return -1;
             }
             else if (pollfds[i].revents & (POLLIN|POLLPRI))
             {
                 rgIpcPollHandles[i].revents = (uint8_t)PollEvents::SIGNALED;
-                break;
+            }
+            else
+            {
+                rgIpcPollHandles[i].revents = (uint8_t)PollEvents::UNKNOWN;
+                if (callback != nullptr)
+                    callback("unkown poll response", (uint32_t)pollfds[i].revents);
             }
         }
     }
@@ -341,7 +344,7 @@ bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nByt
         pfd.fd = _clientSocket;
         pfd.events = POLLIN;
         int retval = poll(&pfd, 1, timeoutMs);
-        if (retval <= 0 || pfd.revents != POLLIN)
+        if (retval <= 0 || !(pfd.revents & POLLIN))
         {
             // timeout or error
             return false;
@@ -382,7 +385,7 @@ bool IpcStream::Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32
         pfd.fd = _clientSocket;
         pfd.events = POLLOUT;
         int retval = poll(&pfd, 1, timeoutMs);
-        if (retval <= 0 || pfd.revents != POLLOUT)
+        if (retval <= 0 || !(pfd.revents & POLLOUT))
         {
             // timeout or error
             return false;
index 99d670c..ecbf9db 100644 (file)
@@ -16,6 +16,7 @@ typedef void (*ErrorCallback)(const char *szMessage, uint32_t code);
 
 class IpcStream final
 {
+    friend class IpcStreamFactory;
 public:
     static constexpr int32_t InfiniteTimeout = -1;
     ~IpcStream();
@@ -26,6 +27,7 @@ public:
 
     class DiagnosticsIpc final
     {
+        friend class IpcStreamFactory;
     public:
         enum ConnectionMode
         {
@@ -38,7 +40,8 @@ public:
             NONE     = 0x00, // no events
             SIGNALED = 0x01, // ready for use
             HANGUP   = 0x02, // connection remotely closed
-            ERR      = 0x04  // other error
+            ERR      = 0x04, // error
+            UNKNOWN   = 0x80  // unknown state
         };
 
         // The bookeeping struct used for polling on server and client structs
@@ -125,7 +128,7 @@ public:
 private:
 #ifdef TARGET_UNIX
     int _clientSocket = -1;
-    IpcStream(int clientSocket, int serverSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER)
+    IpcStream(int clientSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER)
         : _clientSocket(clientSocket), _mode(mode) {}
 #else
     HANDLE _hPipe = INVALID_HANDLE_VALUE;
index b04def7..7688cab 100644 (file)
@@ -74,6 +74,8 @@ DWORD WINAPI DiagnosticServer::DiagnosticsServerThread(LPVOID)
                 continue;
             }
 
+            STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "DiagnosticServer - received IPC message with command set (%d) and command id (%d)\n", message.GetHeader().CommandSet, message.GetHeader().CommandId);
+
             switch ((DiagnosticsIpc::DiagnosticServerCommandSet)message.GetHeader().CommandSet)
             {
             case DiagnosticsIpc::DiagnosticServerCommandSet::EventPipe:
index d24f14c..6a82f16 100644 (file)
@@ -12,8 +12,10 @@ Volatile<bool> IpcStreamFactory::s_isShutdown = false;
 
 bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback)
 {
+    STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO1000, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - ENTER.\n");
     if (_pStream == nullptr)
     {
+        STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - cache was empty!\n");
         // cache is empty, reconnect, e.g., there was a disconnect
         IpcStream *pConnection = _pIpc->Connect(callback);
         if (pConnection == nullptr)
@@ -22,6 +24,11 @@ bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::Diagno
                 callback("Failed to connect to client connection", -1);
             return false;
         }
+#ifdef TARGET_UNIX
+        STRESS_LOG1(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _clientSocket = %d }\n", pConnection->_clientSocket);
+#else
+        STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _hPipe = %d, _oOverlap.hEvent = %d }\n", pConnection->_hPipe, pConnection->_oOverlap.hEvent);
+#endif
         if (!DiagnosticsIpc::SendIpcAdvertise_V1(pConnection))
         {
             if (callback != nullptr)
@@ -33,6 +40,7 @@ bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::Diagno
         _pStream = pConnection;
     }
     *pIpcPollHandle = { nullptr, _pStream, 0, this };
+    STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - EXIT.\n");
     return true;
 }
 
@@ -139,6 +147,7 @@ int32_t IpcStreamFactory::GetNextTimeout(int32_t currentTimeoutMs)
 
 IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback)
 {
+    STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - ENTER");
     IpcStream *pStream = nullptr;
     CQuickArrayList<IpcStream::DiagnosticsIpc::IpcPollHandle> rgIpcPollHandles;
 
@@ -168,6 +177,25 @@ IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback)
 
         nPollAttempts++;
         STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - Poll attempt: %d, timeout: %dms.\n", nPollAttempts, pollTimeoutMs);
+        for (uint32_t i = 0; i < rgIpcPollHandles.Size(); i++)
+        {
+            if (rgIpcPollHandles[i].pIpc != nullptr)
+            {
+#ifdef TARGET_UNIX
+                STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _serverSocket = %d }\n", i, rgIpcPollHandles[i].pIpc->_serverSocket);
+#else
+                STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pIpc->_hPipe, rgIpcPollHandles[i].pIpc->_oOverlap.hEvent);
+#endif
+            }
+            else
+            {
+#ifdef TARGET_UNIX
+                STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _clientSocket = %d }\n", i, rgIpcPollHandles[i].pStream->_clientSocket);
+#else
+                STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pStream->_hPipe, rgIpcPollHandles[i].pStream->_oOverlap.hEvent);
+#endif
+            }
+        }
         int32_t retval = IpcStream::DiagnosticsIpc::Poll(rgIpcPollHandles.Ptr(), (uint32_t)rgIpcPollHandles.Size(), pollTimeoutMs, callback);
         bool fSawError = false;
 
@@ -211,6 +239,11 @@ IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback)
             rgIpcPollHandles.Pop();
     }
 
+#ifdef TARGET_UNIX
+    STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_clientSocket);
+#else
+    STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_hPipe);
+#endif
     return pStream;
 }
 
index bf17e1e..2f39237 100644 (file)
@@ -70,7 +70,7 @@ namespace Tracing.Tests.Common
                 foreach ((string key, string value) in environment)
                     process.StartInfo.Environment.Add(key, value);
                 process.StartInfo.FileName = Process.GetCurrentProcess().MainModule.FileName;
-                process.StartInfo.Arguments = new Uri(currentAssembly.CodeBase).LocalPath + " 0";
+                process.StartInfo.Arguments = new Uri(currentAssembly.Location).LocalPath + " 0";
                 process.StartInfo.RedirectStandardOutput = true;
                 process.StartInfo.RedirectStandardInput = true;
                 process.StartInfo.RedirectStandardError = true;
index 1b99f91..19c9b21 100644 (file)
@@ -102,7 +102,6 @@ namespace Tracing.Tests.Common
                 socket.ReceiveBufferSize = Math.Max(bufferSize, 128);
                 socket.Bind(remoteEP);
                 socket.Listen(255);
-                socket.LingerState.Enabled = false;
                 _server = socket;
             }
         }
@@ -166,20 +165,13 @@ namespace Tracing.Tests.Common
                     }
                     break;
                 case Socket socket:
-                    try
-                    {
-                        socket.Shutdown(SocketShutdown.Both);
-                    }
-                    catch {}
-                    finally
-                    {
-                        _clientSocket?.Close();
-                        socket.Close();
-                        socket.Dispose();
-                        _clientSocket?.Dispose();
-                        if (File.Exists(_serverAddress))
-                            File.Delete(_serverAddress);
-                    }
+                    if (File.Exists(_serverAddress))
+                        File.Delete(_serverAddress);
+                    socket.Close();
+                    socket.Dispose();
+                    _clientSocket?.Shutdown(SocketShutdown.Both);
+                    _clientSocket?.Close();
+                    _clientSocket?.Dispose();
                     break;
                 default:
                     throw new ArgumentException("Invalid server type");
index ff01fbc..15eadb0 100644 (file)
@@ -47,16 +47,22 @@ namespace Tracing.Tests.ReverseValidation
                             Logger.logger.Log("Starting EventPipeSession over standard connection");
                             using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId);
                             Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}");
-                            using var source = new EventPipeEventSource(stream);
-                            Task readerTask = Task.Run(() => source.Process());
+                            // using var source = new EventPipeEventSource(stream);
+                            using var memroyStream = new MemoryStream();
+                            Task readerTask = stream.CopyToAsync(memroyStream);//Task.Run(() => source.Process());
                             await Task.Delay(500);
                             Logger.logger.Log("Stopping EventPipeSession over standard connection");
                             EventPipeClient.StopTracing(pid, sessionId);
                             await readerTask;
                             Logger.logger.Log("Stopped EventPipeSession over standard connection");
                         }
+                        catch (Exception e)
+                        {
+                            Logger.logger.Log(e.ToString());
+                        }
                         finally
                         {
+                            Logger.logger.Log("setting the MRE");
                             mre.Set();
                         }
                     });