From: Mikhail Kurinnoi Date: Wed, 3 May 2023 19:12:35 +0000 (+0300) Subject: Improve synchronization, minor code fixes. X-Git-Tag: accepted/tizen/unified/20230616.172407~2 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=6b421b8188e55a85602cddb077f4f157b81098fe;p=sdk%2Ftools%2Fcoreprofiler.git Improve synchronization, minor code fixes. --- diff --git a/src/sync/sharedresource.h b/src/sync/sharedresource.h index b16ee3d..3e39147 100644 --- a/src/sync/sharedresource.h +++ b/src/sync/sharedresource.h @@ -130,25 +130,6 @@ public: using A::A; // Protected constructor. }; - template - class ConstAccessor : public A - { - friend class SharedResource; - - public: - const T *operator->() const - { - return &this->m_shared_resource->m_resource; - } - - const T &operator*() const - { - return this->m_shared_resource->m_resource; - } - - using A::A; // Protected constructor. - }; - template SharedResource(Args&& ...args) : m_resource(std::forward(args)...) @@ -177,17 +158,10 @@ public: return this; } - auto lock_const() const -> - ConstAccessor>> - { - // Implicit conversion to accessor with constant exclusive lock. - return this; - } - - auto lock_shared() const -> - ConstAccessor>> + auto lock_shared() -> + MutableAccessor>> { - // Implicit conversion to accessor with constant shared lock. + // Implicit conversion to accessor with mutable shared lock. return this; } diff --git a/src/trace/commontrace.cpp b/src/trace/commontrace.cpp index d433341..6a89793 100644 --- a/src/trace/commontrace.cpp +++ b/src/trace/commontrace.cpp @@ -56,13 +56,10 @@ static void SampleHandlerStub( return; } - CommonTrace *trace = - reinterpret_cast(siginfo->si_value.sival_ptr); - - if (trace != nullptr && trace->IsEnabled()) + if (g_pCommonTraceObject != nullptr && g_pCommonTraceObject->IsEnabled()) { int terrno = errno; - trace->HandleSample(context); + g_pCommonTraceObject->HandleSample(context); errno = terrno; } } @@ -289,7 +286,6 @@ void CommonTrace::Shutdown() noexcept __forceinline void CommonTrace::SendDoSample(ThreadInfo &thrInfo) noexcept { union sigval val; - val.sival_ptr = this; int ev = pthread_sigqueue(thrInfo.nativeHandle, SAMPLE_SIGNAL, val); // It is OK if the limit of signals which may be queued has been reached. if (ev && ev != EAGAIN) @@ -305,7 +301,7 @@ __forceinline void CommonTrace::SendDoLog(ThreadInfo &thrInfo) noexcept _ASSERTE(m_logThread.joinable()); union sigval val; - val.sival_ptr = &thrInfo; + val.sival_ptr = (void*)thrInfo.id; int ev = pthread_sigqueue(m_logThread.native_handle(), LOG_SIGNAL, val); // It is OK if the limit of signals which may be queued has been reached. if (ev && ev != EAGAIN) @@ -408,17 +404,22 @@ void CommonTrace::LogThread(binary_semaphore *pInitialized) noexcept _ASSERTE(siginfo.si_signo == LOG_SIGNAL); - ThreadInfo *pThreadInfo = - reinterpret_cast(siginfo.si_value.sival_ptr); + auto storage_lock = m_threadStorage.lock_shared(); + + if (!storage_lock->HasValue((ThreadID)siginfo.si_value.sival_ptr)) + continue; + + ThreadInfo &thrInfo = storage_lock->Get((ThreadID)siginfo.si_value.sival_ptr); + for ( // Local copy of volatile data. - size_t count = pThreadInfo->eventChannel.GetEventSummaryCount(); + size_t count = thrInfo.eventChannel.GetEventSummaryCount(); count > 0; --count) { const EventSummary &summary = - pThreadInfo->eventChannel.GetCurrentEventSummary(); - TRACE().DumpSample(pThreadInfo->internalId, summary); - pThreadInfo->eventChannel.NextEventSummary(); + thrInfo.eventChannel.GetCurrentEventSummary(); + TRACE().DumpSample(thrInfo.internalId, summary); + thrInfo.eventChannel.NextEventSummary(); } } } @@ -481,7 +482,7 @@ void CommonTrace::SamplingThread(binary_semaphore *pInitialized) noexcept if (itThrInfo == endThrInfo) { { - auto storage_lock = this->GetThreadStorage(); + auto storage_lock = this->GetThreadStorageShared(); liveThreads = storage_lock->GetLiveContainer(); } itThrInfo = liveThreads.begin(); @@ -491,7 +492,7 @@ void CommonTrace::SamplingThread(binary_semaphore *pInitialized) noexcept if (!liveThreads.empty()) { - auto storage_lock = this->GetThreadStorage(); + auto storage_lock = this->GetThreadStorageShared(); ThreadInfo &thrInfo = *itThrInfo++; // We update all live threads if they are attached to OS @@ -1088,7 +1089,7 @@ HRESULT CommonTrace::ThreadAssignedToOSThread( { InternalID threadIid; { - auto storage_lock = m_threadStorage.lock(); + auto storage_lock = m_threadStorage.lock_shared(); ThreadInfo &thrInfo = storage_lock->Get(managedThreadId); if (thrInfo.osThreadId != osThreadId) diff --git a/src/trace/commontrace.h b/src/trace/commontrace.h index e66caa5..7176a6f 100644 --- a/src/trace/commontrace.h +++ b/src/trace/commontrace.h @@ -167,7 +167,7 @@ public: return m_threadStorage.lock(); } - auto GetThreadStorage() const -> decltype(m_threadStorage.lock_shared()) + auto GetThreadStorageShared() -> decltype(m_threadStorage.lock_shared()) { return m_threadStorage.lock_shared(); } @@ -177,7 +177,7 @@ public: return m_classStorage.lock(); } - auto GetClassStorage() const -> decltype(m_classStorage.lock_shared()) + auto GetClassStorageShared() -> decltype(m_classStorage.lock_shared()) { return m_classStorage.lock_shared(); } diff --git a/src/trace/cputrace.cpp b/src/trace/cputrace.cpp index c0511b3..416f6d3 100644 --- a/src/trace/cputrace.cpp +++ b/src/trace/cputrace.cpp @@ -142,7 +142,7 @@ void CpuTrace::LogThread() noexcept if (m_profiler.GetConfig().CpuTraceThreadEnabled) { auto storage_lock = - m_profiler.GetCommonTrace().GetThreadStorage(); + m_profiler.GetCommonTrace().GetThreadStorageShared(); for (ThreadInfo &thrInfo : storage_lock->GetLiveRange()) { if (m_disabled == true) diff --git a/src/tracelog/tracefmt.h b/src/tracelog/tracefmt.h index ab9b4d0..559b06f 100644 --- a/src/tracelog/tracefmt.h +++ b/src/tracelog/tracefmt.h @@ -22,8 +22,6 @@ #include #include #include -#include -#include #include @@ -117,28 +115,18 @@ class tracefmt private: class record { - private: - struct deleter - { - void operator()(std::ostream *stream) - { - *stream << std::endl; - } - }; - public: - record(const char *str, std::ostream *stream, std::mutex &mutex) - : m_lock(mutex) - , m_stream(stream) + record(const char *str, std::ostream &stream) + : m_stream(stream) { - *m_stream << str; + m_stream << str; } template record& str(T&& value) { sep(); - *m_stream << std::forward(value); + m_stream << std::forward(value); return *this; } @@ -174,7 +162,7 @@ private: record& qtn(T&& value) { sep(); - *m_stream << '"' << std::forward(value) << '"'; + m_stream << '"' << std::forward(value) << '"'; return *this; } @@ -187,29 +175,29 @@ private: record& hex(DWORD64 value) { sep(); - *m_stream << hexnum(value); + m_stream << hexnum(value); return *this; } record& hex32(DWORD value) { sep(); - *m_stream << hexnum32(value); + m_stream << hexnum32(value); return *this; } record& hex64(DWORD64 value) { sep(); - *m_stream << hexnum64(value); + m_stream << hexnum64(value); return *this; } record& systime(const SYSTEMTIME& systime) { - char fill = m_stream->fill('0'); + char fill = m_stream.fill('0'); sep(); - *m_stream + m_stream << std::setw(4) << systime.wYear << '-' << std::setw(2) << systime.wMonth << '-' << std::setw(2) << systime.wDay @@ -217,13 +205,13 @@ private: << ':' << std::setw(2) << systime.wMinute << ':' << std::setw(2) << systime.wSecond << '.' << std::setw(3) << systime.wMilliseconds; - m_stream->fill(fill); + m_stream.fill(fill); return *this; } record& bias(LONG bias) { - char fill = m_stream->fill('0'); + char fill = m_stream.fill('0'); char sign = '+'; if (bias < 0) @@ -235,11 +223,11 @@ private: ldiv_t hour_minute = ldiv(bias, 60); sep(); - *m_stream + m_stream << sign << std::setw(2) << hour_minute.quot << ':' << std::setw(2) << hour_minute.rem; - m_stream->fill(fill); + m_stream.fill(fill); return *this; } @@ -248,7 +236,7 @@ private: record& config(T&& value) { sep(); - *m_stream << convert(std::forward(value)); + m_stream << convert(std::forward(value)); return *this; } @@ -287,7 +275,7 @@ private: if (ip == 0) { sep(); - *m_stream << '?'; + m_stream << '?'; return *this; } else @@ -314,7 +302,7 @@ private: record& id(InternalID id) { sep(); - *m_stream << id; + m_stream << id; return *this; } @@ -398,25 +386,29 @@ private: switch (m_grp_state) { case grp_state::NOT_IN_GROUP: - *m_stream << ' '; + m_stream << ' '; break; case grp_state::FIRST_IN_GROUP: m_grp_state = grp_state::IN_GROUP; - *m_stream << ' '; + m_stream << ' '; break; case grp_state::IN_GROUP: if (m_sep != -1) { - *m_stream << static_cast(m_sep); + m_stream << static_cast(m_sep); } break; } return *this; } + void end() + { + m_stream << std::endl; + } + private: - std::unique_lock m_lock; - std::unique_ptr m_stream; + std::ostream &m_stream; enum class grp_state { NOT_IN_GROUP, @@ -428,24 +420,16 @@ private: }; public: - explicit tracefmt(std::ostream& stream) - : m_stream(&stream) - {} - - tracefmt() : tracefmt(std::cout) {} + tracefmt() = default; tracefmt(const tracefmt&) = delete; tracefmt &operator=(const tracefmt&) = delete; - record log(const char *str) + record log(const char *str, std::ostream& stream) { - return record(str, m_stream, m_mutex); + return record(str, stream); } - -private: - std::ostream *m_stream; - std::mutex m_mutex; }; } diff --git a/src/tracelog/tracelog.cpp b/src/tracelog/tracelog.cpp index 755152b..2cd6a9f 100644 --- a/src/tracelog/tracelog.cpp +++ b/src/tracelog/tracelog.cpp @@ -17,30 +17,49 @@ #include #include #include +#include +#include +#include +#include #include "tracefmt.h" #include "tracelog.h" #include "osocketstream.h" +struct trace_queue_elem_t +{ + std::unique_ptr data; + std::size_t size = 0; + std::size_t capacity = 0; +}; +typedef std::list trace_queue_t; + +static thread_local std::stringstream g_tls_ss; +static thread_local trace_queue_t g_tls_queue; + + class TraceLog final : public ITraceLog { public: TraceLog(StdOutStream_t) : m_stream(std::cout) , m_stream_owner(false) - , m_tracefmt(m_stream) + , m_needStopWorker(false) + , m_worker{&TraceLog::Worker, this} {} TraceLog(StdErrStream_t) : m_stream(std::cerr) , m_stream_owner(false) - , m_tracefmt(m_stream) + , m_needStopWorker(false) + , m_worker{&TraceLog::Worker, this} {} TraceLog(FileStream_t, const std::string &filename) : m_stream(*new std::ofstream()) , m_stream_owner(true) - , m_tracefmt(m_stream) + , m_needStopWorker(false) + , m_worker{&TraceLog::Worker, this} { try { @@ -67,7 +86,8 @@ public: TraceLog(SocketStream_t, uint16_t port, const std::string &SocketPrompt) : m_stream(*new net::osocketstream()) , m_stream_owner(true) - , m_tracefmt(m_stream) + , m_needStopWorker(false) + , m_worker{&TraceLog::Worker, this} { try { @@ -97,6 +117,13 @@ public: virtual ~TraceLog() { + // Shutdown worker. + m_mutex.lock(); + m_needStopWorker = true; + m_mutex.unlock(); + m_workerCV.notify_one(); + m_worker.join(); + if (m_stream_owner) { delete &m_stream; @@ -111,48 +138,40 @@ public: const SYSTEMTIME &systime, LONG bias) override { - m_tracefmt.log("prf stm").systime(systime).bias(bias); + m_tracefmt.log("prf stm", g_tls_ss).systime(systime).bias(bias).end(); + AddTLSDataToQueue(); } virtual void DumpProfilerConfig( const ProfilerConfig &config) override { - m_tracefmt.log("prf cfg").str("CollectionMethod") - .config(config.CollectionMethod); - m_tracefmt.log("prf cfg").str("SamplingTimeoutMs") - .config(config.SamplingTimeoutMs); - m_tracefmt.log("prf cfg").str("HighGranularityEnabled") - .config(config.HighGranularityEnabled); - m_tracefmt.log("prf cfg").str("TracingSuspendedOnStart") - .config(config.TracingSuspendedOnStart); - m_tracefmt.log("prf cfg").str("LineTraceEnabled") - .config(config.LineTraceEnabled); - m_tracefmt.log("prf cfg").str("CpuTraceProcessEnabled") - .config(config.CpuTraceProcessEnabled); - m_tracefmt.log("prf cfg").str("CpuTraceThreadEnabled") - .config(config.CpuTraceThreadEnabled); - m_tracefmt.log("prf cfg").str("CpuTraceTimeoutMs") - .config(config.CpuTraceTimeoutMs); - m_tracefmt.log("prf cfg").str("ExecutionTraceEnabled") - .config(config.ExecutionTraceEnabled); - m_tracefmt.log("prf cfg").str("MemoryTraceEnabled") - .config(config.MemoryTraceEnabled); - m_tracefmt.log("prf cfg").str("StackTrackingEnabled") - .config(config.StackTrackingEnabled); - m_tracefmt.log("prf cfg").str("GcAllocTableTraceEnabled") - .config(config.GcAllocTableTraceEnabled); + m_tracefmt.log("prf cfg", g_tls_ss).str("CollectionMethod").config(config.CollectionMethod).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("SamplingTimeoutMs").config(config.SamplingTimeoutMs).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("HighGranularityEnabled").config(config.HighGranularityEnabled).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("TracingSuspendedOnStart").config(config.TracingSuspendedOnStart).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("LineTraceEnabled").config(config.LineTraceEnabled).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("CpuTraceProcessEnabled").config(config.CpuTraceProcessEnabled).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("CpuTraceThreadEnabled").config(config.CpuTraceThreadEnabled).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("CpuTraceTimeoutMs").config(config.CpuTraceTimeoutMs).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("ExecutionTraceEnabled").config(config.ExecutionTraceEnabled).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("MemoryTraceEnabled").config(config.MemoryTraceEnabled).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("StackTrackingEnabled").config(config.StackTrackingEnabled).end(); + m_tracefmt.log("prf cfg", g_tls_ss).str("GcAllocTableTraceEnabled").config(config.GcAllocTableTraceEnabled).end(); + AddTLSDataToQueue(); } virtual void DumpProfilerTracingPause( DWORD ticks) override { - m_tracefmt.log("prf tps").ms(ticks); + m_tracefmt.log("prf tps", g_tls_ss).ms(ticks).end(); + AddTLSDataToQueue(); } virtual void DumpProfilerTracingResume( DWORD ticks) override { - m_tracefmt.log("prf trs").ms(ticks); + m_tracefmt.log("prf trs", g_tls_ss).ms(ticks).end(); + AddTLSDataToQueue(); } // @@ -163,7 +182,8 @@ public: DWORD timestamp, DWORD64 usage) override { - m_tracefmt.log("prc cpu").ms(timestamp).us(usage); + m_tracefmt.log("prc cpu", g_tls_ss).ms(timestamp).us(usage).end(); + AddTLSDataToQueue(); } // @@ -174,20 +194,23 @@ public: ThreadID threadId, InternalID threadIid) override { - m_tracefmt.log("thr crt").id(threadId).id(threadIid); + m_tracefmt.log("thr crt", g_tls_ss).id(threadId).id(threadIid).end(); + AddTLSDataToQueue(); } virtual void DumpThreadAssignedToOSThread( InternalID managedThreadIid, DWORD osThreadId) override { - m_tracefmt.log("thr aos").id(managedThreadIid).pid(osThreadId); + m_tracefmt.log("thr aos", g_tls_ss).id(managedThreadIid).pid(osThreadId).end(); + AddTLSDataToQueue(); } virtual void DumpThreadDestroyed( InternalID threadIid) override { - m_tracefmt.log("thr dst").id(threadIid); + m_tracefmt.log("thr dst", g_tls_ss).id(threadIid).end(); + AddTLSDataToQueue(); } virtual void DumpThreadTimes( @@ -195,7 +218,8 @@ public: DWORD timestamp, DWORD64 usage) override { - m_tracefmt.log("thr cpu").id(threadIid).ms(timestamp).us(usage); + m_tracefmt.log("thr cpu", g_tls_ss).id(threadIid).ms(timestamp).us(usage).end(); + AddTLSDataToQueue(); } // @@ -212,16 +236,16 @@ public: if (moduleName == nullptr) moduleName = W("UNKNOWN"); - m_tracefmt.log("mod ldf") - .id(moduleId).ptr(baseLoadAddress).id(assemblyId).hr(hrStatus) - .qtn(moduleName); + m_tracefmt.log("mod ldf", g_tls_ss).id(moduleId).ptr(baseLoadAddress).id(assemblyId).hr(hrStatus).qtn(moduleName).end(); + AddTLSDataToQueue(); } virtual void DumpModuleAttachedToAssembly( ModuleID moduleId, AssemblyID assemblyId) override { - m_tracefmt.log("mod ata").id(moduleId).id(assemblyId); + m_tracefmt.log("mod ata", g_tls_ss).id(moduleId).id(assemblyId).end(); + AddTLSDataToQueue(); } virtual void DumpAssemblyLoadFinished( @@ -234,9 +258,8 @@ public: if (assemblyName == nullptr) assemblyName = W("UNKNOWN"); - m_tracefmt.log("asm ldf") - .id(assemblyId).id(appDomainId).id(moduleId).hr(hrStatus) - .qtn(assemblyName); + m_tracefmt.log("asm ldf", g_tls_ss).id(assemblyId).id(appDomainId).id(moduleId).hr(hrStatus).qtn(assemblyName).end(); + AddTLSDataToQueue(); } virtual void DumpAppDomainCreationFinished( @@ -248,8 +271,8 @@ public: if (appDomainName == nullptr) appDomainName = W("UNKNOWN"); - m_tracefmt.log("apd crf") - .id(appDomainId).id(processId).hr(hrStatus).qtn(appDomainName); + m_tracefmt.log("apd crf", g_tls_ss).id(appDomainId).id(processId).hr(hrStatus).qtn(appDomainName).end(); + AddTLSDataToQueue(); } // @@ -260,15 +283,15 @@ public: const ClassInfo &info, HRESULT hrStatus) override { - m_tracefmt.log("cls ldf") - .id(info.id).id(info.internalId).id(info.moduleId) - .token(info.classToken).hr(hrStatus); + m_tracefmt.log("cls ldf", g_tls_ss).id(info.id).id(info.internalId).id(info.moduleId).token(info.classToken).hr(hrStatus).end(); + AddTLSDataToQueue(); } virtual void DumpClassName( const ClassInfo &info) override { - m_tracefmt.log("cls nam").id(info.internalId).qtn(info.fullName); + m_tracefmt.log("cls nam", g_tls_ss).id(info.internalId).qtn(info.fullName).end(); + AddTLSDataToQueue(); } // @@ -278,7 +301,7 @@ public: virtual void DumpFunctionInfo( const FunctionInfo &info) override { - auto log = m_tracefmt.log("fun inf"); + auto log = m_tracefmt.log("fun inf", g_tls_ss); log.id(info.internalId).id(info.id).id(info.classId).id(info.moduleId) .token(info.funcToken); for (const auto& ci : info.codeInfo) @@ -289,13 +312,15 @@ public: { log.il_map(m); } + log.end(); + AddTLSDataToQueue(); } virtual void DumpFunctionName( const FunctionInfo &info) override { - m_tracefmt.log("fun nam").id(info.internalId) - .qtn(info.fullName).qtn(info.returnType).qtn(info.signature); + m_tracefmt.log("fun nam", g_tls_ss).id(info.internalId).qtn(info.fullName).qtn(info.returnType).qtn(info.signature).end(); + AddTLSDataToQueue(); } // @@ -307,8 +332,8 @@ public: DWORD timestamp, FunctionID functionId) override { - m_tracefmt.log("jit cms").pid(osThreadId).ms(timestamp) - .id(functionId); + m_tracefmt.log("jit cms", g_tls_ss).pid(osThreadId).ms(timestamp).id(functionId).end(); + AddTLSDataToQueue(); } virtual void DumpJITCompilationFinished( @@ -317,8 +342,8 @@ public: FunctionID functionId, HRESULT hrStatus) override { - m_tracefmt.log("jit cmf").pid(osThreadId).ms(timestamp) - .id(functionId).hr(hrStatus); + m_tracefmt.log("jit cmf", g_tls_ss).pid(osThreadId).ms(timestamp).id(functionId).hr(hrStatus).end(); + AddTLSDataToQueue(); } virtual void DumpJITCachedFunctionSearchStarted( @@ -326,8 +351,8 @@ public: DWORD timestamp, FunctionID functionId) override { - m_tracefmt.log("jit css").pid(osThreadId).ms(timestamp) - .id(functionId); + m_tracefmt.log("jit css", g_tls_ss).pid(osThreadId).ms(timestamp).id(functionId).end(); + AddTLSDataToQueue(); } virtual void DumpJITCachedFunctionSearchFinished( @@ -335,8 +360,8 @@ public: DWORD timestamp, FunctionID functionId) override { - m_tracefmt.log("jit csf").pid(osThreadId).ms(timestamp) - .id(functionId); + m_tracefmt.log("jit csf", g_tls_ss).pid(osThreadId).ms(timestamp).id(functionId).end(); + AddTLSDataToQueue(); } // @@ -350,45 +375,51 @@ public: BOOL generationCollected[], COR_PRF_GC_REASON reason) override { - auto log = m_tracefmt.log("gch gcs"); + auto log = m_tracefmt.log("gch gcs", g_tls_ss); log.pid(osThreadId).ms(timestamp).str(reason); log.grps(); for (int i = 0; i < cGenerations; ++i) { log.str(generationCollected[i] ? 't' : 'f'); } - log.grpe(); + log.grpe().end(); + AddTLSDataToQueue(); } virtual void DumpGarbageCollectionFinished( DWORD osThreadId, DWORD timestamp) override { - m_tracefmt.log("gch gcf").pid(osThreadId).ms(timestamp); + m_tracefmt.log("gch gcf", g_tls_ss).pid(osThreadId).ms(timestamp).end(); + AddTLSDataToQueue(); } virtual void DumpGenerationBounds( DWORD timestamp, const std::vector &ranges) override { - auto log = m_tracefmt.log("gch gen"); + auto log = m_tracefmt.log("gch gen", g_tls_ss); log.ms(timestamp); for (const auto &range : ranges) { log.gen_range(range); } + log.end(); + AddTLSDataToQueue(); } virtual void DumpGcHeapAllocTable( DWORD timestamp, const AllocTable &allocInfoByTypes) override { - auto log = m_tracefmt.log("gch alt"); + auto log = m_tracefmt.log("gch alt", g_tls_ss); log.ms(timestamp); for (const auto &classIidAllocInfo : allocInfoByTypes) { log.alt_item(classIidAllocInfo.first, classIidAllocInfo.second); } + log.end(); + AddTLSDataToQueue(); } // @@ -401,7 +432,7 @@ public: { if (summary.HasStackSample()) { - auto log = m_tracefmt.log("sam str"); + auto log = m_tracefmt.log("sam str", g_tls_ss); log.id(threadIid).ms(summary.ticks).num(summary.count); log.grps(':'); log.num(summary.matchPrefixSize).num(summary.stackSize); @@ -414,11 +445,13 @@ public: { log.frame(frame); } + log.end(); + AddTLSDataToQueue(); } if (summary.HasAllocSample()) { - auto log = m_tracefmt.log("sam mem"); + auto log = m_tracefmt.log("sam mem", g_tls_ss); log.id(threadIid).ms(summary.ticks); for (const auto &classIidIpAllocInfo : summary.allocIpTable) { @@ -428,13 +461,105 @@ public: IpAllocInfo.second, IpAllocInfo.first); } } + log.end(); + AddTLSDataToQueue(); } } private: + std::mutex m_mutex; std::ostream& m_stream; bool m_stream_owner; tracefmt m_tracefmt; + bool m_needStopWorker; + std::thread m_worker; + std::condition_variable m_workerCV; + trace_queue_t m_workerQueue; + // Cache for used entries (reuse old in order to prevent memory allocations). + std::mutex m_cacheMutex; + trace_queue_t m_cache; + + void Worker() + { + std::unique_lock lock(m_mutex); + + trace_queue_t currentQueue; + + while (true) + { + while (m_workerQueue.empty()) + { + if (m_needStopWorker) + { + return; + } + + m_workerCV.wait(lock); + } + + currentQueue.splice(currentQueue.begin(), m_workerQueue); + + lock.unlock(); + + for (auto &entry : currentQueue) + { + m_stream.write(entry.data.get(), entry.size); + } + + // Store to cache for re-usage in future. + m_cacheMutex.lock(); + m_cache.splice(m_cache.end(), currentQueue); + m_cacheMutex.unlock(); + + lock.lock(); + } + } + + void AddTLSDataToQueue() + { + // Note, we reuse g_tls_ss's buffer and don't realloc it all the time for best speed. + std::size_t dataSize = g_tls_ss.rdbuf()->pubseekoff(0, std::ios_base::cur, std::ios_base::out); + + // Note, we use `g_tls_queue` only as temporary container in order to prevent its creation with memory allocation all the time. + assert(g_tls_queue.empty()); + + // Try to use cache first (reuse some old entry). + m_cacheMutex.lock(); + if (!m_cache.empty()) + { + // get one element from cache + g_tls_queue.splice(g_tls_queue.begin(), m_cache, std::prev(m_cache.end())); + } + m_cacheMutex.unlock(); + + if (g_tls_queue.empty()) + { + // create one element + g_tls_queue.emplace_front(); + auto &entry = g_tls_queue.front(); + entry.data.reset(new char[dataSize]); + entry.capacity = dataSize; + entry.size = dataSize; + } + else + { + auto &entry = g_tls_queue.front(); + if (entry.capacity < dataSize) + { + entry.data.reset(new char[dataSize]); + entry.capacity = dataSize; + } + entry.size = dataSize; + } + + g_tls_ss.rdbuf()->sgetn(g_tls_queue.front().data.get(), dataSize); + g_tls_ss.rdbuf()->pubseekpos(0); + + m_mutex.lock(); + m_workerQueue.splice(m_workerQueue.end(), g_tls_queue); + m_mutex.unlock(); + m_workerCV.notify_one(); + } }; // static