for message in self.messageHandler: # which will resume if there is a message from kafka
while(WorkerPool.taskQueue.full() is True):
time.sleep(5)
- print "put package %s into queue" % message.value
+ print "put package %s into queue" % message.value
+ WorkerPool.taskQueue.put(message.value)
+class SpecialProducer(Process):
+ def __init__(self):
+ super(SpecialProducer, self).__init__()
+ self.messageHandler = KafkaConsumer('tizen-unified-large-packages',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
+ def run(self):
+ print "start special produce, os pid: %d" % (os.getpid())
+ for message in self.messageHandler: # which will resume if there is a message from kafka
+ while(WorkerPool.taskQueue.full() is True):
+ time.sleep(5)
+ print "put special package %s into queue" % message.value
WorkerPool.taskQueue.put(message.value)
class GbsBuild(object):
super(Consumer,self).__init__()
self.partition = partition
self.threadId = threadId
-
+
def run(self):
print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
while True:
else:
print "send fail"
self.messageHandler.close()
-
+
class WorkerPool(object):
capcaticy = cpu_count()
curThreadNum = 0
taskQueue = multiprocessing.Queue(capcaticy*100)
def __init__(self):
+ self.specialProducer = None
+ if(WorkerPool.capcaticy == 8):
+ self.specialProducer = SpecialProducer()
self.producer = Producer()
self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
def start(self):
self.producer.start()
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].start()
+ if(self.specialProducer):
+ self.specialProducer.daemon = True
+ self.specialProducer.start()
self.producer.join()
self.producer.messageHandler.close()
+ if(self.specialProducer):
+ self.specialProducer.join()
+ self.specialProducer.messageHandler.close()
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].join()
wp = WorkerPool()