# "sudo -v" period
use constant SUDOV_PERIOD => 3*60;
use constant SC_NPROCESSORS_ONLN => 84;
+
+# use kafka to send message to other workers
+use Kafka::Connection;
+use Kafka::Producer;
+use Kafka::Consumer;
+use Kafka qw(
+ $DEFAULT_MAX_BYTES
+ $DEFAULT_MAX_NUMBER_OF_OFFSETS
+ $RECEIVE_EARLIEST_OFFSET
+);
+use Scalar::Util qw{
+ blessed
+};
+use Try::Tiny;
+
+
+
my @threads; # TODO: clean up
my @exclude = (); # exclude build packages list
my @repos= (); # rpm repositoies list
my $enable_cluster = 0; # enable cluster building
my $max_partitions = 3; # specify max partitions, which is more than or equal to workers
my %packages_level = (); # save the level of packages, which are calculated by get_top_order() algorithm
+my $connection;
+my $connection1;
+my $producer;
+my $consumer;
GetOptions (
"repository=s" => \@repos,
"arch=s" => \$arch,
$prune = 1;
}
}
-# use kafka to send message to other workers
-use Kafka::Producer;
-my $connection = Kafka::Connection->new( host => '109.123.100.144' );
-my $producer = Kafka::Producer->new( Connection => $connection );
sub writeToKafka {
my $cur_level = shift;
my $partition = 0;
- my @fail_packages;
foreach my $package (@{$packages_level{$cur_level}}) {
$producer->send('tizen-unified',$partition,"$package");
$partition = ($partition+1)%($max_partitions);
}
- return @fail_packages;
-}
+}
+
+sub readFromKafka {
+ my $cur_level = shift;
+ my $partition = 0;
+ my @fail_packages;
+ my @succeed_packages;
+ my $cur_offset = 0;
+ my $packageNum = @{$packages_level{$cur_level}};
+ while(@fail_packages + @succeed_packages < $packageNum ) {
+ my $messages = $consumer->fetch(
+ 'tizen-unified-status',
+ $partition,
+ $cur_offset,
+ $DEFAULT_MAX_BYTES
+ );
+ foreach my $message ( @$messages ) {
+ if ( $message->valid ) {
+ $cur_offset = $message->next_offset;
+ if($message->payload eq "failed") {
+ push(@fail_packages, $message->key);
+ } else {
+ push(@succeed_packages, $message->key);
+ }
+ } else {
+ info($message->error);
+ }
+ }
+ debug("wait for next 0.5s to check");
+ sleep(1); # sleep 0.5s for next check
+ }
+ return @fail_packages;
+}
# MAIN
if ($depends) {
info("start generate packages depends from: " . $package_path . " ($style)");
}
# if build with cluster, not build local
if ($enable_cluster == 1) {
+ try {
+ $connection = Kafka::Connection->new( host => '109.123.100.144' );
+ $producer = Kafka::Producer->new( Connection => $connection );
+ $consumer = Kafka::Consumer->new( Connection => $connection );
+ } catch {
+ my $error = $_;
+ if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
+ warn 'Error: (', $error->code, ') ', $error->message, "\n";
+ exit;
+ } else {
+ die $error;
+ }
+ };
+
my $cur_level = 0;
my $ret = 0;
if($debug) {
print_level_packages();
}
+ my @fail_packages;
+ # when a level packages building return status are all succeed,
+ # then go to next level packages.
while(defined $packages_level{$cur_level}) {
- my @fail_packages = writeToKafka($cur_level);
- if(@fail_packages > 0) {
- while(@fail_packages) {
- print "$_ ";
+ writeToKafka($cur_level);
+ @fail_packages = readFromKafka($cur_level);
+ if( @fail_packages ) {
+ foreach my $p ( @fail_packages ) {
+ print "$p ";
}
print "\n";
error("these @fail_packages packages build failed");
}
+ else {
+ info("@{$packages_level{$cur_level}} packages build succeed");
+ }
$cur_level++;
}
exit $ret;