1:remove producer, and let mutex job to do by kafka, so worker can make the best...
authorjingui.ren <jingui.ren@samsung.com>
Thu, 31 Jan 2019 03:36:15 +0000 (11:36 +0800)
committerjingui.ren <jingui.ren@samsung.com>
Thu, 31 Jan 2019 03:47:03 +0000 (11:47 +0800)
2:add large consumer, which can receive large packages from 'tizen-unified-large-packages' topic.

Change-Id: I538f58728747e4670ffc09b07c99d6287645b715

tools/worker.py

index 6a4662e14e24860b4e91acfa190ea288b1077ed6..e51da3fb913823b103f4a0676580262c87c0fe47 100755 (executable)
@@ -11,29 +11,6 @@ from multiprocessing import Process, cpu_count
 from kafka import KafkaConsumer
 from kafka import KafkaProducer
 
-class Producer(Process):
-       def __init__(self):
-               super(Producer, self).__init__()
-               self.messageHandler = KafkaConsumer('tizen-unified',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
-       def run(self):
-               print "start produce, os pid: %d" % (os.getpid())
-               for message in self.messageHandler: # which will resume if there is a message from kafka
-                       while(WorkerPool.taskQueue.full() is True):
-                               time.sleep(5)
-                       print "put package %s into queue" % message.value
-                       WorkerPool.taskQueue.put(message.value)
-class SpecialProducer(Process):
-       def __init__(self):
-               super(SpecialProducer, self).__init__()
-               self.messageHandler = KafkaConsumer('tizen-unified-large-packages',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
-       def run(self):
-               print "start special produce, os pid: %d" % (os.getpid())
-               for message in self.messageHandler: # which will resume if there is a message from kafka
-                       while(WorkerPool.taskQueue.full() is True):
-                               time.sleep(5)
-                       print "put special package %s into queue" % message.value
-                       WorkerPool.taskQueue.put(message.value)
-
 class GbsBuild(object):
        errorRule = re.compile(r"some packages failed to be built")
        def __init__(self,packageName, id):
@@ -61,13 +38,64 @@ class Consumer(Process):
                super(Consumer,self).__init__()
                self.partition = partition
                self.threadId = threadId
-
        def run(self):
                print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
                while True:
-                       while WorkerPool.taskQueue.empty() is True:
-                               time.sleep(1)
-                       packageName = WorkerPool.taskQueue.get()
+                       # while WorkerPool.taskQueue.empty() is True:
+                       #       time.sleep(1)
+                       # packageName = WorkerPool.taskQueue.get()
+                       self.messageConsumer = KafkaConsumer('tizen-unified', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
+                       resultDict = self.messageConsumer.poll(36000,1)
+
+                       while not resultDict:
+                               resultDict = self.messageConsumer.poll(36000, 1)
+
+                       ConsumerRecords = resultDict.values()
+                       packageName = ConsumerRecords[0][0].value
+                       # # a record must be commit, and then can get next
+                       # commitResult = self.messageConsumer.commit()
+                       self.messageConsumer.close()
+                       # print commitResult
+                       print "thread %d start %s package " % (self.threadId, packageName)
+                       gbsbuild = GbsBuild(packageName, self.threadId)
+                       print "thread %d building %s package" % (self.threadId, packageName)
+                       self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.137:9092")
+                       if gbsbuild.build() == "Success":
+                       #if True:
+                               result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
+                               if(result.get(60)):
+                                       print "send success"
+                               else:
+                                       print "send fail"
+                       else:
+                               result = self.messageHandler.send("tizen-unified-status", value = "failed", key = packageName, partition=0)
+                               if(result.get(60)):
+                                       print "send success"
+                               else:
+                                       print "send fail"
+                       self.messageHandler.close()
+
+class LargeConsumer(Process):
+       def __init__(self, threadId, partition = 0):
+               super(LargeConsumer,self).__init__()
+               self.partition = partition
+               self.threadId = threadId
+
+       def run(self):
+               print "start large consume thread %d , os pid: %d" % (self.threadId, os.getpid())
+               while True:
+                       # while WorkerPool.taskQueue.empty() is True:
+                       #       time.sleep(1)
+                       # packageName = WorkerPool.taskQueue.get()
+                       self.messageConsumer = KafkaConsumer('tizen-unified-large-packages', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
+                       resultDict = self.messageConsumer.poll(36000, 1)
+
+                       while not resultDict:
+                               resultDict = self.messageConsumer.poll(36000, 1)
+
+                       ConsumerRecords = resultDict.values()
+                       packageName = ConsumerRecords[0][0].value
+                       self.messageConsumer.close()
                        print "thread %d start %s package " % (self.threadId, packageName)
                        gbsbuild = GbsBuild(packageName,self.threadId)
                        print "thread %d building %s package" % (self.threadId, packageName)
@@ -90,30 +118,19 @@ class Consumer(Process):
 class WorkerPool(object):
        capcaticy = cpu_count()
        curThreadNum = 0
-       taskQueue = multiprocessing.Queue(capcaticy*100)
        def __init__(self):
-               self.specialProducer = None
                if(WorkerPool.capcaticy == 8):
-                       self.specialProducer = SpecialProducer()
-               self.producer = Producer()
-               self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
+                       self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy - 1)]
+                       self.consumers.append(LargeConsumer(WorkerPool.capcaticy - 1))
+               else:
+                       self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
        def start(self):
                #set daemon
-               self.producer.daemon = True
                for i in range(0, WorkerPool.capcaticy):
                        self.consumers[i].daemon = True
                print "start Worker pool"
-               self.producer.start()
                for i in range(0, WorkerPool.capcaticy):
                        self.consumers[i].start()
-               if(self.specialProducer):
-                       self.specialProducer.daemon = True
-                       self.specialProducer.start()
-               self.producer.join()
-               self.producer.messageHandler.close()
-               if(self.specialProducer):
-                       self.specialProducer.join()
-                       self.specialProducer.messageHandler.close()
                for i in range(0, WorkerPool.capcaticy):
                        self.consumers[i].join()
 wp = WorkerPool()