Add TCP socket implementation for output stream. (#53)
authorМихаил Куринной/AI Ecosystem Lab /SRR/Engineer/삼성전자 <m.kurinnoi@samsung.com>
Tue, 5 Feb 2019 10:21:14 +0000 (13:21 +0300)
committerAlexander Soldatov/AI Ecosystem Lab /SRR/Staff Engineer/삼성전자 <soldatov.a@samsung.com>
Tue, 5 Feb 2019 10:21:14 +0000 (13:21 +0300)
src/track/CMakeLists.txt
src/track/libheaptrack.cpp
src/track/outstream/outstream.cpp
src/track/outstream/outstream_file.cpp
src/track/outstream/outstream_socket.cpp [new file with mode: 0644]
src/track/outstream/outstream_socket.h [new file with mode: 0644]

index 6ad255dc8bc60b32f18a0b63e92c8facfbf884ac..6dd29c5be5bcccaf3859317d58546ef4658a409e 100644 (file)
@@ -20,6 +20,7 @@ add_library(heaptrack_preload MODULE
     libheaptrack.cpp
     outstream/outstream.cpp
     outstream/outstream_file.cpp
+    outstream/outstream_socket.cpp
 )
 
 target_compile_options(heaptrack_preload PRIVATE "-ftls-model=initial-exec")
@@ -48,6 +49,7 @@ add_library(heaptrack_inject MODULE
     libheaptrack.cpp
     outstream/outstream.cpp
     outstream/outstream_file.cpp
+    outstream/outstream_socket.cpp
 )
 
 target_link_libraries(heaptrack_inject LINK_PRIVATE
index d50cc1d38bd0c7d45337352867fa998477ebbbe5..f93acfba06500c16110a4ef588000c3a5f8e652e 100644 (file)
@@ -1,4 +1,4 @@
-/*
+/*
  * Copyright 2014-2017 Milian Wolff <mail@milianw.de>
  *
  * This library is free software; you can redistribute it and/or
@@ -54,6 +54,7 @@
 #include "util/config.h"
 #include "util/libunwind_config.h"
 #include "outstream/outstream_file.h"
+#include "outstream/outstream_socket.h"
 
 /**
  * uncomment this to get extended debug code for known pointers
@@ -139,6 +140,32 @@ outStream* createFile(const char* fileName)
     } else if (outputFileName == "stderr") {
         debugLog<VerboseOutput>("%s", "will write to stderr");
         return OpenStream<outStreamFILE, FILE*>(stderr);
+    } else if (outputFileName == "socket") {
+        uint16_t Port = outStreamSOCKET::DefaultSocketPort;
+        char *env = nullptr;
+        env = getenv("DUMP_HEAPTRACK_SOCKET");
+        if (env) {
+            try {
+                int tmpPort = std::stoi(std::string(env));
+                if (tmpPort < outStreamSOCKET::MinAllowedSocketPort
+                    || tmpPort > outStreamSOCKET::MaxAllowedSocketPort) {
+                    fprintf(stderr, "WARNING: DUMP_HEAPTRACK_SOCKET socket port is out of allowed range.\n");
+                    throw std::out_of_range("DUMP_HEAPTRACK_SOCKET socket port is out of allowed range");
+                }
+                Port = static_cast<uint16_t>(tmpPort);
+            } catch (...) {
+                // do nothing, use default port
+                fprintf(stderr,
+                        "WARNING: DUMP_HEAPTRACK_SOCKET should be number in %i-%i range\n",
+                        outStreamSOCKET::MinAllowedSocketPort,
+                        outStreamSOCKET::MaxAllowedSocketPort);
+                fprintf(stderr, "WARNING: switched to default port %i\n",
+                        static_cast<int>(outStreamSOCKET::DefaultSocketPort));
+            }
+            unsetenv("DUMP_HEAPTRACK_SOCKET");
+        }
+        debugLog<VerboseOutput>("%s", "will write to socket");
+        return OpenStream<outStreamSOCKET, uint16_t>(Port);
     }
 
     if (outputFileName.empty()) {
index ebc4175d3d57f5225d506f2f37427e4ff1f46b2e..3cdbd8935ddffad3c1593e2f1c70ec94afa13286 100644 (file)
@@ -38,7 +38,7 @@ int fprintf(outStream *stream, const char* format, ...) noexcept
     }
 
     int ret = stream->Puts(Buf.get());
-    if (ret > 0) {
+    if (ret >= 0) {
         // make proper return code, since it different from fputs()
         ret = tmpStrSize;
     }
index 7b32567287a5b56b7cc93022d776587ef3cfe4de..54ab758c1d7e8280b445dbd38783c0d40e875209 100644 (file)
@@ -42,7 +42,7 @@ int outStreamFILE::Putc(int Char) noexcept
 {\r
     if (!Stream_) {\r
         errno = EIO;\r
-        return -1;\r
+        return EOF;\r
     }\r
     return fputc(Char, Stream_);\r
 }\r
@@ -51,10 +51,10 @@ int outStreamFILE::Puts(const char *String) noexcept
 {\r
     if (!Stream_) {\r
         errno = EIO;\r
-        return -1;\r
+        return EOF;\r
     } else if (!String) {\r
         errno = EINVAL;\r
-        return -1;\r
+        return EOF;\r
     }\r
     return fputs(String, Stream_);\r
 }\r
diff --git a/src/track/outstream/outstream_socket.cpp b/src/track/outstream/outstream_socket.cpp
new file mode 100644 (file)
index 0000000..2311cbe
--- /dev/null
@@ -0,0 +1,165 @@
+#include <arpa/inet.h>\r
+#include <cassert>\r
+#include <netinet/in.h>\r
+#include <netinet/tcp.h>\r
+#include <stdexcept>\r
+#include <string.h>\r
+#include <unistd.h>\r
+\r
+#include "outstream_socket.h"\r
+\r
+outStreamSOCKET::outStreamSOCKET(uint16_t Port) :\r
+    Socket_(-1),\r
+    BufferUsedSize_(0),\r
+    Buffer_(new char[BufferCapacity_])\r
+{\r
+    int tmpSocketID = -1;\r
+    auto HandleError = [&tmpSocketID] (const char *ErrorText, int Error) {\r
+        if (tmpSocketID != -1) {\r
+            close(tmpSocketID);\r
+        }\r
+        fprintf(stderr, "WARNING! %s: %s\n", ErrorText, strerror(Error));\r
+        throw std::runtime_error(ErrorText);\r
+    };\r
+\r
+    tmpSocketID = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);\r
+    if (tmpSocketID == -1) {\r
+        HandleError("socket()", errno);\r
+    }\r
+\r
+    int on = 1;\r
+    if (setsockopt(tmpSocketID, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1\r
+        || setsockopt(tmpSocketID, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == -1) {\r
+        HandleError("setsockopt()", errno);\r
+    }\r
+\r
+    struct sockaddr_in tmpServerAddr;\r
+    memset((char*)&tmpServerAddr, 0, sizeof(tmpServerAddr));\r
+    tmpServerAddr.sin_family = AF_INET;\r
+    tmpServerAddr.sin_port = htons(Port);\r
+    tmpServerAddr.sin_addr.s_addr = INADDR_ANY;\r
+    if (bind(tmpSocketID, (struct sockaddr *) &tmpServerAddr, sizeof(tmpServerAddr)) == -1) {\r
+        HandleError("bind()", errno);\r
+    }\r
+\r
+    if (listen(tmpSocketID, 1) == -1) {\r
+        HandleError("listen()", errno);\r
+    }\r
+\r
+    struct sockaddr_storage tmpServerStorage;\r
+    socklen_t addr_size = sizeof tmpServerStorage;\r
+    Socket_ = accept(tmpSocketID, (struct sockaddr*)&tmpServerStorage, &addr_size);\r
+    if (Socket_ == -1) {\r
+        HandleError("accept()", errno);\r
+    }\r
+\r
+    close(tmpSocketID);\r
+}\r
+\r
+outStreamSOCKET::~outStreamSOCKET()\r
+{\r
+    if (Socket_ == -1) {\r
+        return;\r
+    }\r
+\r
+    FlushBuffer();\r
+    close(Socket_);\r
+}\r
+\r
+bool outStreamSOCKET::SocketErrorDetected() noexcept\r
+{\r
+    int SocketErrno;\r
+    unsigned int ErrnoSize = sizeof(SocketErrno);\r
+    if (getsockopt(Socket_, SOL_SOCKET, SO_ERROR, &SocketErrno, &ErrnoSize) == -1) {\r
+        return true;\r
+    }\r
+\r
+    if (SocketErrno) {\r
+        close(Socket_);\r
+        Socket_ = -1;\r
+        fprintf(stderr, "WARNING! Unable to use socket: %s", strerror(SocketErrno));\r
+        errno = SocketErrno;\r
+        return true;\r
+    }\r
+\r
+    return false;\r
+}\r
+\r
+bool outStreamSOCKET::BufferedWriteToSocket(const void *Data, size_t Count) noexcept\r
+{\r
+    if (Count > BufferCapacity_) {\r
+        if (!FlushBuffer()) {\r
+            return false;\r
+        }\r
+        return SendToSocket(Data, Count);\r
+    }\r
+\r
+    if (Count > AvailableSpace()) {\r
+        if (!FlushBuffer()) {\r
+            return false;\r
+        }\r
+    }\r
+\r
+    CopyToBuffer(Data, Count);\r
+    return true;\r
+}\r
+\r
+bool outStreamSOCKET::SendToSocket(const void *Data, size_t Count) noexcept\r
+{\r
+    if (SocketErrorDetected()) {\r
+        return false;\r
+    }\r
+    if (Count == 0) {\r
+        return true;\r
+    }\r
+    return send(Socket_, Data, Count, MSG_NOSIGNAL) != -1;\r
+}\r
+\r
+void outStreamSOCKET::CopyToBuffer(const void *Data, size_t Count) noexcept\r
+{\r
+    memcpy(BufferPos(), Data, Count);\r
+    BufferUsedSize_ += Count;\r
+}\r
+\r
+bool outStreamSOCKET::FlushBuffer() noexcept\r
+{\r
+    if (Socket_ == -1) {\r
+        errno = EIO;\r
+        return false;\r
+    }\r
+    bool ret = SendToSocket(Buffer_.get(), BufferUsedSize_);\r
+    BufferUsedSize_ = 0;\r
+    return ret;\r
+}\r
+\r
+int outStreamSOCKET::Putc(int Char) noexcept\r
+{\r
+    if (Socket_ == -1) {\r
+        errno = EIO;\r
+        return EOF;\r
+    }\r
+\r
+    // same behavior as for fputc()\r
+    unsigned char tmpChar = static_cast<unsigned char>(Char);\r
+    if (BufferedWriteToSocket(&tmpChar, sizeof(unsigned char)))\r
+        return Char;\r
+\r
+    return EOF;\r
+}\r
+\r
+int outStreamSOCKET::Puts(const char *String) noexcept\r
+{\r
+    if (Socket_ == -1) {\r
+        errno = EIO;\r
+        return EOF;\r
+    } else if (!String) {\r
+        errno = EINVAL;\r
+        return EOF;\r
+    }\r
+\r
+    // same behavior as for fputs()\r
+    if (BufferedWriteToSocket(String, strlen(String)))\r
+        return 1; // return a nonnegative number on success\r
+\r
+    return EOF;\r
+}\r
diff --git a/src/track/outstream/outstream_socket.h b/src/track/outstream/outstream_socket.h
new file mode 100644 (file)
index 0000000..c02de54
--- /dev/null
@@ -0,0 +1,68 @@
+#ifndef OUTSTREAMSOCKET_H
+#define OUTSTREAMSOCKET_H
+
+#include <memory>
+#include <cassert>
+#include "outstream.h"
+
+class outStreamSOCKET final : public outStream
+{
+public:
+    outStreamSOCKET() = delete;
+    explicit outStreamSOCKET(uint16_t Port);
+    ~outStreamSOCKET();
+
+    outStreamSOCKET(const outStreamSOCKET &) = delete;
+    outStreamSOCKET &operator = (const outStreamSOCKET &) = delete;
+
+    outStreamSOCKET(outStreamSOCKET &&other) :
+        Socket_(other.Socket_),
+        BufferUsedSize_(other.BufferUsedSize_),
+        Buffer_(std::move(other.Buffer_))
+    {
+        other.Socket_ = -1;
+        other.BufferUsedSize_ = 0;
+    }
+    outStreamSOCKET &operator = (outStreamSOCKET &&other) {
+        Socket_ = other.Socket_;
+        other.Socket_ = -1;
+        BufferUsedSize_ = other.BufferUsedSize_;
+        other.BufferUsedSize_ = 0;
+        Buffer_ = std::move(other.Buffer_);
+        return *this;
+    }
+
+    int Putc(int Char) noexcept override;
+    int Puts(const char *String) noexcept override;
+
+    static constexpr int MinAllowedSocketPort = 1;
+    static constexpr int MaxAllowedSocketPort = 65535;
+    static constexpr uint16_t DefaultSocketPort = 5050;
+
+private:
+    bool SocketErrorDetected() noexcept;
+    bool BufferedWriteToSocket(const void *Data, size_t Count) noexcept;
+    bool SendToSocket(const void *Data, size_t Count) noexcept;
+    void CopyToBuffer(const void *Data, size_t Count) noexcept;
+    bool FlushBuffer() noexcept;
+
+    size_t AvailableSpace() const noexcept
+    {
+        assert(BufferCapacity_ >= BufferUsedSize_);
+        return BufferCapacity_ - BufferUsedSize_;
+    }
+
+    char *BufferPos() const noexcept
+    {
+        assert(BufferCapacity_ >= BufferUsedSize_);
+        return Buffer_.get() + BufferUsedSize_;
+    }
+
+    int Socket_;
+    // mainly, heaptrack send small blocks (about 1-50 bytes each)
+    static constexpr size_t BufferCapacity_ = 4096;
+    size_t BufferUsedSize_;
+    std::unique_ptr<char[]> Buffer_;
+};
+
+#endif // OUTSTREAMSOCKET_H