1: adjust messageHandler to more safe. 2: adjust process number according to current...
authorjingui.ren <jingui.ren@samsung.com>
Tue, 22 Jan 2019 03:01:36 +0000 (11:01 +0800)
committerjingui.ren <jingui.ren@samsung.com>
Tue, 22 Jan 2019 03:01:36 +0000 (11:01 +0800)
Change-Id: Ie02c12767610606dc7f6afe19290d627dab03b59

tools/worker.py

index a816b143d79167a8c19ca1bad1cd18ad73f1202b..a7d8fc1a0f2e72670554875a3a7425f11fe91ee2 100755 (executable)
@@ -7,7 +7,7 @@ import Queue
 import time
 import threading
 import multiprocessing
-from multiprocessing import Process
+from multiprocessing import Process, cpu_count
 from kafka import KafkaConsumer
 from kafka import KafkaProducer
 
@@ -50,7 +50,7 @@ class Consumer(Process):
                super(Consumer,self).__init__()
                self.partition = partition
                self.threadId = threadId
-               self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.144:9092")
+               
        def run(self):
                print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
                while True:
@@ -60,6 +60,7 @@ class Consumer(Process):
                        print "thread %d start %s package " % (self.threadId, packageName)
                        gbsbuild = GbsBuild(packageName,self.threadId)
                        print "thread %d building %s package" % (self.threadId, packageName)
+                       self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.137:9092")
                        if gbsbuild.build() == "Success":
                        #if True:
                                result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
@@ -73,9 +74,10 @@ class Consumer(Process):
                                        print "send success"
                                else:
                                        print "send fail"
+                       self.messageHandler.close()
                        
 class WorkerPool(object):
-       capcaticy = 4
+       capcaticy = cpu_count()
        curThreadNum = 0
        taskQueue = multiprocessing.Queue(capcaticy*100)
        def __init__(self):
@@ -91,6 +93,7 @@ class WorkerPool(object):
                for i in range(0, WorkerPool.capcaticy):
                        self.consumers[i].start()
                self.producer.join()
+               self.producer.messageHandler.close()
                for i in range(0, WorkerPool.capcaticy):
                        self.consumers[i].join()
 wp = WorkerPool()