add special large packages get class
authorjingui.ren <jingui.ren@samsung.com>
Thu, 24 Jan 2019 05:35:50 +0000 (13:35 +0800)
committerjingui.ren <jingui.ren@samsung.com>
Thu, 24 Jan 2019 05:39:17 +0000 (13:39 +0800)
Change-Id: Idff5610a51d3aeaeefe5ffa0f8b1483cded65cff

tools/worker.py

index a7d8fc1a0f2e72670554875a3a7425f11fe91ee2..6a4662e14e24860b4e91acfa190ea288b1077ed6 100755 (executable)
@@ -20,7 +20,18 @@ class Producer(Process):
                for message in self.messageHandler: # which will resume if there is a message from kafka
                        while(WorkerPool.taskQueue.full() is True):
                                time.sleep(5)
-                       print "put package %s into queue" % message.value 
+                       print "put package %s into queue" % message.value
+                       WorkerPool.taskQueue.put(message.value)
+class SpecialProducer(Process):
+       def __init__(self):
+               super(SpecialProducer, self).__init__()
+               self.messageHandler = KafkaConsumer('tizen-unified-large-packages',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
+       def run(self):
+               print "start special produce, os pid: %d" % (os.getpid())
+               for message in self.messageHandler: # which will resume if there is a message from kafka
+                       while(WorkerPool.taskQueue.full() is True):
+                               time.sleep(5)
+                       print "put special package %s into queue" % message.value
                        WorkerPool.taskQueue.put(message.value)
 
 class GbsBuild(object):
@@ -50,7 +61,7 @@ class Consumer(Process):
                super(Consumer,self).__init__()
                self.partition = partition
                self.threadId = threadId
-               
+
        def run(self):
                print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
                while True:
@@ -75,12 +86,15 @@ class Consumer(Process):
                                else:
                                        print "send fail"
                        self.messageHandler.close()
-                       
+
 class WorkerPool(object):
        capcaticy = cpu_count()
        curThreadNum = 0
        taskQueue = multiprocessing.Queue(capcaticy*100)
        def __init__(self):
+               self.specialProducer = None
+               if(WorkerPool.capcaticy == 8):
+                       self.specialProducer = SpecialProducer()
                self.producer = Producer()
                self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
        def start(self):
@@ -92,8 +106,14 @@ class WorkerPool(object):
                self.producer.start()
                for i in range(0, WorkerPool.capcaticy):
                        self.consumers[i].start()
+               if(self.specialProducer):
+                       self.specialProducer.daemon = True
+                       self.specialProducer.start()
                self.producer.join()
                self.producer.messageHandler.close()
+               if(self.specialProducer):
+                       self.specialProducer.join()
+                       self.specialProducer.messageHandler.close()
                for i in range(0, WorkerPool.capcaticy):
                        self.consumers[i].join()
 wp = WorkerPool()