from kafka import KafkaConsumer
from kafka import KafkaProducer
-class Producer(Process):
- def __init__(self):
- super(Producer, self).__init__()
- self.messageHandler = KafkaConsumer('tizen-unified',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
- def run(self):
- print "start produce, os pid: %d" % (os.getpid())
- for message in self.messageHandler: # which will resume if there is a message from kafka
- while(WorkerPool.taskQueue.full() is True):
- time.sleep(5)
- print "put package %s into queue" % message.value
- WorkerPool.taskQueue.put(message.value)
-class SpecialProducer(Process):
- def __init__(self):
- super(SpecialProducer, self).__init__()
- self.messageHandler = KafkaConsumer('tizen-unified-large-packages',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
- def run(self):
- print "start special produce, os pid: %d" % (os.getpid())
- for message in self.messageHandler: # which will resume if there is a message from kafka
- while(WorkerPool.taskQueue.full() is True):
- time.sleep(5)
- print "put special package %s into queue" % message.value
- WorkerPool.taskQueue.put(message.value)
-
class GbsBuild(object):
errorRule = re.compile(r"some packages failed to be built")
def __init__(self,packageName, id):
super(Consumer,self).__init__()
self.partition = partition
self.threadId = threadId
-
def run(self):
print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
while True:
- while WorkerPool.taskQueue.empty() is True:
- time.sleep(1)
- packageName = WorkerPool.taskQueue.get()
+ # while WorkerPool.taskQueue.empty() is True:
+ # time.sleep(1)
+ # packageName = WorkerPool.taskQueue.get()
+ self.messageConsumer = KafkaConsumer('tizen-unified', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
+ resultDict = self.messageConsumer.poll(36000,1)
+
+ while not resultDict:
+ resultDict = self.messageConsumer.poll(36000, 1)
+
+ ConsumerRecords = resultDict.values()
+ packageName = ConsumerRecords[0][0].value
+ # # a record must be commit, and then can get next
+ # commitResult = self.messageConsumer.commit()
+ self.messageConsumer.close()
+ # print commitResult
+ print "thread %d start %s package " % (self.threadId, packageName)
+ gbsbuild = GbsBuild(packageName, self.threadId)
+ print "thread %d building %s package" % (self.threadId, packageName)
+ self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.137:9092")
+ if gbsbuild.build() == "Success":
+ #if True:
+ result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
+ if(result.get(60)):
+ print "send success"
+ else:
+ print "send fail"
+ else:
+ result = self.messageHandler.send("tizen-unified-status", value = "failed", key = packageName, partition=0)
+ if(result.get(60)):
+ print "send success"
+ else:
+ print "send fail"
+ self.messageHandler.close()
+
+class LargeConsumer(Process):
+ def __init__(self, threadId, partition = 0):
+ super(LargeConsumer,self).__init__()
+ self.partition = partition
+ self.threadId = threadId
+
+ def run(self):
+ print "start large consume thread %d , os pid: %d" % (self.threadId, os.getpid())
+ while True:
+ # while WorkerPool.taskQueue.empty() is True:
+ # time.sleep(1)
+ # packageName = WorkerPool.taskQueue.get()
+ self.messageConsumer = KafkaConsumer('tizen-unified-large-packages', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
+ resultDict = self.messageConsumer.poll(36000, 1)
+
+ while not resultDict:
+ resultDict = self.messageConsumer.poll(36000, 1)
+
+ ConsumerRecords = resultDict.values()
+ packageName = ConsumerRecords[0][0].value
+ self.messageConsumer.close()
print "thread %d start %s package " % (self.threadId, packageName)
gbsbuild = GbsBuild(packageName,self.threadId)
print "thread %d building %s package" % (self.threadId, packageName)
class WorkerPool(object):
capcaticy = cpu_count()
curThreadNum = 0
- taskQueue = multiprocessing.Queue(capcaticy*100)
def __init__(self):
- self.specialProducer = None
if(WorkerPool.capcaticy == 8):
- self.specialProducer = SpecialProducer()
- self.producer = Producer()
- self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
+ self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy - 1)]
+ self.consumers.append(LargeConsumer(WorkerPool.capcaticy - 1))
+ else:
+ self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
def start(self):
#set daemon
- self.producer.daemon = True
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].daemon = True
print "start Worker pool"
- self.producer.start()
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].start()
- if(self.specialProducer):
- self.specialProducer.daemon = True
- self.specialProducer.start()
- self.producer.join()
- self.producer.messageHandler.close()
- if(self.specialProducer):
- self.specialProducer.join()
- self.specialProducer.messageHandler.close()
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].join()
wp = WorkerPool()