From: biao.wang Date: Wed, 6 Mar 2019 02:40:47 +0000 (+0800) Subject: Refine worker code --first version X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=refs%2Fheads%2Fdistributed-worker;p=tools%2Fgbs.git Refine worker code --first version Change-Id: I97aeaa13dc7c354cc7d9fedcacc7a92811f058aa --- diff --git a/tools/worker.py b/tools/worker.py index f730607..f525bff 100755 --- a/tools/worker.py +++ b/tools/worker.py @@ -74,21 +74,22 @@ class GbsBuild(object): os.system("rm -rf %s" % self.result) class Consumer(Process): - def __init__(self, threadId, partition = 0): + def __init__(self, threadId, topic): super(Consumer,self).__init__() - self.partition = partition self.threadId = threadId + self.topic = topic + def run(self): print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid()) while True: # while WorkerPool.taskQueue.empty() is True: # time.sleep(1) # packageName = WorkerPool.taskQueue.get() - self.messageConsumer = KafkaConsumer('tizen-unified', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092') - resultDict = self.messageConsumer.poll(36000,1) - - while not resultDict: - resultDict = self.messageConsumer.poll(36000, 1) + self.messageConsumer = KafkaConsumer(self.topic, group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092') + resultDict = self.messageConsumer.poll(36000, 1) + if not resultDict: + self.messageConsumer.close() + continue ConsumerRecords = resultDict.values() packageName = ConsumerRecords[0][0].value @@ -117,57 +118,15 @@ class Consumer(Process): gbsbuild.syncResult() print "sync done" -class LargeConsumer(Process): - def __init__(self, threadId, partition = 0): - super(LargeConsumer,self).__init__() - self.partition = partition - self.threadId = threadId - - def run(self): - print "start large consume thread %d , os pid: %d" % (self.threadId, os.getpid()) - while True: - # while WorkerPool.taskQueue.empty() is True: - # time.sleep(1) - # packageName = WorkerPool.taskQueue.get() - self.messageConsumer = KafkaConsumer('tizen-unified-large-packages', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092') - resultDict = self.messageConsumer.poll(36000, 1) - - while not resultDict: - resultDict = self.messageConsumer.poll(36000, 1) - - ConsumerRecords = resultDict.values() - packageName = ConsumerRecords[0][0].value - self.messageConsumer.close() - print "thread %d start %s package " % (self.threadId, packageName) - gbsbuild = GbsBuild(packageName,self.threadId) - print "thread %d building %s package" % (self.threadId, packageName) - self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.137:9092") - if gbsbuild.build() == "Success": - #if True: - result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0) - if(result.get(60)): - print "send success" - else: - print "send fail" - else: - result = self.messageHandler.send("tizen-unified-status", value = "failed", key = packageName, partition=0) - if(result.get(60)): - print "send success" - else: - print "send fail" - self.messageHandler.close() - gbsbuild.syncResult() - print "sync done" - class WorkerPool(object): capcaticy = cpu_count() curThreadNum = 0 def __init__(self): if(WorkerPool.capcaticy == 8): - self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy - 1)] - self.consumers.append(LargeConsumer(WorkerPool.capcaticy - 1)) + self.consumers = [Consumer(i, 'tizen-unified') for i in xrange(WorkerPool.capcaticy - 1)] + self.consumers.append(Consumer(WorkerPool.capcaticy - 1, 'tizen-unified-large-packages')) else: - self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)] + self.consumers = [Consumer(i, 'tizen-unified') for i in xrange(WorkerPool.capcaticy)] def start(self): #set daemon for i in range(0, WorkerPool.capcaticy): @@ -180,3 +139,5 @@ class WorkerPool(object): wp = WorkerPool() wp.start() print "Done" + +