{
if (pollfds[i].revents != 0)
{
+ if (callback != nullptr)
+ callback("IpcStream::DiagnosticsIpc::Poll - poll revents", (uint32_t)pollfds[i].revents);
// error check FIRST
if (pollfds[i].revents & POLLHUP)
{
// will technically meet the requirements for POLLIN
// i.e., a call to recv/read won't block
rgIpcPollHandles[i].revents = (uint8_t)PollEvents::HANGUP;
- delete[] pollfds;
- return -1;
}
else if ((pollfds[i].revents & (POLLERR|POLLNVAL)))
{
if (callback != nullptr)
callback("Poll error", (uint32_t)pollfds[i].revents);
rgIpcPollHandles[i].revents = (uint8_t)PollEvents::ERR;
- delete[] pollfds;
- return -1;
}
else if (pollfds[i].revents & (POLLIN|POLLPRI))
{
rgIpcPollHandles[i].revents = (uint8_t)PollEvents::SIGNALED;
- break;
+ }
+ else
+ {
+ rgIpcPollHandles[i].revents = (uint8_t)PollEvents::UNKNOWN;
+ if (callback != nullptr)
+ callback("unkown poll response", (uint32_t)pollfds[i].revents);
}
}
}
pfd.fd = _clientSocket;
pfd.events = POLLIN;
int retval = poll(&pfd, 1, timeoutMs);
- if (retval <= 0 || pfd.revents != POLLIN)
+ if (retval <= 0 || !(pfd.revents & POLLIN))
{
// timeout or error
return false;
pfd.fd = _clientSocket;
pfd.events = POLLOUT;
int retval = poll(&pfd, 1, timeoutMs);
- if (retval <= 0 || pfd.revents != POLLOUT)
+ if (retval <= 0 || !(pfd.revents & POLLOUT))
{
// timeout or error
return false;
class IpcStream final
{
+ friend class IpcStreamFactory;
public:
static constexpr int32_t InfiniteTimeout = -1;
~IpcStream();
class DiagnosticsIpc final
{
+ friend class IpcStreamFactory;
public:
enum ConnectionMode
{
NONE = 0x00, // no events
SIGNALED = 0x01, // ready for use
HANGUP = 0x02, // connection remotely closed
- ERR = 0x04 // other error
+ ERR = 0x04, // error
+ UNKNOWN = 0x80 // unknown state
};
// The bookeeping struct used for polling on server and client structs
private:
#ifdef TARGET_UNIX
int _clientSocket = -1;
- IpcStream(int clientSocket, int serverSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER)
+ IpcStream(int clientSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER)
: _clientSocket(clientSocket), _mode(mode) {}
#else
HANDLE _hPipe = INVALID_HANDLE_VALUE;
continue;
}
+ STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "DiagnosticServer - received IPC message with command set (%d) and command id (%d)\n", message.GetHeader().CommandSet, message.GetHeader().CommandId);
+
switch ((DiagnosticsIpc::DiagnosticServerCommandSet)message.GetHeader().CommandSet)
{
case DiagnosticsIpc::DiagnosticServerCommandSet::EventPipe:
bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback)
{
+ STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO1000, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - ENTER.\n");
if (_pStream == nullptr)
{
+ STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - cache was empty!\n");
// cache is empty, reconnect, e.g., there was a disconnect
IpcStream *pConnection = _pIpc->Connect(callback);
if (pConnection == nullptr)
callback("Failed to connect to client connection", -1);
return false;
}
+#ifdef TARGET_UNIX
+ STRESS_LOG1(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _clientSocket = %d }\n", pConnection->_clientSocket);
+#else
+ STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _hPipe = %d, _oOverlap.hEvent = %d }\n", pConnection->_hPipe, pConnection->_oOverlap.hEvent);
+#endif
if (!DiagnosticsIpc::SendIpcAdvertise_V1(pConnection))
{
if (callback != nullptr)
_pStream = pConnection;
}
*pIpcPollHandle = { nullptr, _pStream, 0, this };
+ STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - EXIT.\n");
return true;
}
IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback)
{
+ STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - ENTER");
IpcStream *pStream = nullptr;
CQuickArrayList<IpcStream::DiagnosticsIpc::IpcPollHandle> rgIpcPollHandles;
nPollAttempts++;
STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - Poll attempt: %d, timeout: %dms.\n", nPollAttempts, pollTimeoutMs);
+ for (uint32_t i = 0; i < rgIpcPollHandles.Size(); i++)
+ {
+ if (rgIpcPollHandles[i].pIpc != nullptr)
+ {
+#ifdef TARGET_UNIX
+ STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _serverSocket = %d }\n", i, rgIpcPollHandles[i].pIpc->_serverSocket);
+#else
+ STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pIpc->_hPipe, rgIpcPollHandles[i].pIpc->_oOverlap.hEvent);
+#endif
+ }
+ else
+ {
+#ifdef TARGET_UNIX
+ STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _clientSocket = %d }\n", i, rgIpcPollHandles[i].pStream->_clientSocket);
+#else
+ STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pStream->_hPipe, rgIpcPollHandles[i].pStream->_oOverlap.hEvent);
+#endif
+ }
+ }
int32_t retval = IpcStream::DiagnosticsIpc::Poll(rgIpcPollHandles.Ptr(), (uint32_t)rgIpcPollHandles.Size(), pollTimeoutMs, callback);
bool fSawError = false;
rgIpcPollHandles.Pop();
}
+#ifdef TARGET_UNIX
+ STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_clientSocket);
+#else
+ STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_hPipe);
+#endif
return pStream;
}
foreach ((string key, string value) in environment)
process.StartInfo.Environment.Add(key, value);
process.StartInfo.FileName = Process.GetCurrentProcess().MainModule.FileName;
- process.StartInfo.Arguments = new Uri(currentAssembly.CodeBase).LocalPath + " 0";
+ process.StartInfo.Arguments = new Uri(currentAssembly.Location).LocalPath + " 0";
process.StartInfo.RedirectStandardOutput = true;
process.StartInfo.RedirectStandardInput = true;
process.StartInfo.RedirectStandardError = true;
socket.ReceiveBufferSize = Math.Max(bufferSize, 128);
socket.Bind(remoteEP);
socket.Listen(255);
- socket.LingerState.Enabled = false;
_server = socket;
}
}
}
break;
case Socket socket:
- try
- {
- socket.Shutdown(SocketShutdown.Both);
- }
- catch {}
- finally
- {
- _clientSocket?.Close();
- socket.Close();
- socket.Dispose();
- _clientSocket?.Dispose();
- if (File.Exists(_serverAddress))
- File.Delete(_serverAddress);
- }
+ if (File.Exists(_serverAddress))
+ File.Delete(_serverAddress);
+ socket.Close();
+ socket.Dispose();
+ _clientSocket?.Shutdown(SocketShutdown.Both);
+ _clientSocket?.Close();
+ _clientSocket?.Dispose();
break;
default:
throw new ArgumentException("Invalid server type");
Logger.logger.Log("Starting EventPipeSession over standard connection");
using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId);
Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}");
- using var source = new EventPipeEventSource(stream);
- Task readerTask = Task.Run(() => source.Process());
+ // using var source = new EventPipeEventSource(stream);
+ using var memroyStream = new MemoryStream();
+ Task readerTask = stream.CopyToAsync(memroyStream);//Task.Run(() => source.Process());
await Task.Delay(500);
Logger.logger.Log("Stopping EventPipeSession over standard connection");
EventPipeClient.StopTracing(pid, sessionId);
await readerTask;
Logger.logger.Log("Stopped EventPipeSession over standard connection");
}
+ catch (Exception e)
+ {
+ Logger.logger.Log(e.ToString());
+ }
finally
{
+ Logger.logger.Log("setting the MRE");
mre.Set();
}
});