--- /dev/null
+import struct
+import feature
+from .. import handler
+from .. import client_lib
+
+
+class LSanFeature(feature.Feature):
+
+ __CONFIG_LSAN = 'lsan'
+ __PROTOCOL_LSAN = {'MEMORY_ALLOC_ALWAYS_PROBING', 'LSAN'}
+ __MSG_LSAN_STR = 'MSG_LSAN'
+ __LSAN_MSG_ERR = 0x0
+ __LSAN_MSG_STATUS = 0x1
+ __LSAN_MSG_REPORT = 0x2
+ __LSAN_INIT_DONE = 0x4
+ __STATUS_FMT = 'I'
+ __HEAD_FMT = 'III1c'
+
+ def __init__(self, handler, intr, outdir):
+ self.__handler = handler
+ self.__intr = intr
+ self.__outdir = outdir
+ self.__handler.register_cb(self.__MSG_LSAN_STR, self.__lsan_handler)
+
+ def __get_filepath(self, data):
+ size = struct.calcsize(self.__HEAD_FMT)
+ first_loop = True
+ cnt = size
+ c = ''
+ path = ''
+ while first_loop or c != '\0':
+ # This prevents adding to path null-termination
+ path += c
+ first_loop = False
+ c = data[cnt]
+ cnt += 1
+ return path
+
+ def __pull_lsan_report(self, path):
+ self.__intr.pull(path, self.__outdir)
+
+ def __lsan_handler(self, data):
+ parsed = struct.unpack_from(self.__STATUS_FMT, data)
+
+ if parsed[0] == self.__LSAN_MSG_REPORT:
+ path = self.__get_filepath(data)
+ self.__pull_lsan_report(path)
+
+ @classmethod
+ def resolve_features(cls, features_list):
+ if cls.__CONFIG_LSAN in features_list:
+ return cls.__PROTOCOL_LSAN
+ else:
+ return None
from .features import feature
from .features import common
+from .features import lsan
class FeaturesHandler(object):
- __features_list = [common.CommonFeature]
+ __features_list = [common.CommonFeature, lsan.LSanFeature]
def __init__(self, handler=None, intr=None, outdir=None):
from __future__ import absolute_import
import copy
from . import v4_1
+from . import format_descriptor
def proto_data():
# raise ValueError('Not implemented')
data = copy.deepcopy(v4_1.proto_data())
+ # TODO make it better way
+ feature_dict = data[format_descriptor.ProtoData.FEATURES]
+ feature_dict['LSAN'] = 0x4000000000000
+
+ data_msgs = data[format_descriptor.ProtoData.DATA_MESSAGES]
+ data_msgs['MSG_LSAN'] = 0x0022
+
return data