import time
import threading
import multiprocessing
-from multiprocessing import Process
+from multiprocessing import Process, cpu_count
from kafka import KafkaConsumer
from kafka import KafkaProducer
super(Consumer,self).__init__()
self.partition = partition
self.threadId = threadId
- self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.144:9092")
+
def run(self):
print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
while True:
print "thread %d start %s package " % (self.threadId, packageName)
gbsbuild = GbsBuild(packageName,self.threadId)
print "thread %d building %s package" % (self.threadId, packageName)
+ self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.137:9092")
if gbsbuild.build() == "Success":
#if True:
result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
print "send success"
else:
print "send fail"
+ self.messageHandler.close()
class WorkerPool(object):
- capcaticy = 4
+ capcaticy = cpu_count()
curThreadNum = 0
taskQueue = multiprocessing.Queue(capcaticy*100)
def __init__(self):
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].start()
self.producer.join()
+ self.producer.messageHandler.close()
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].join()
wp = WorkerPool()