From: jingui.ren Date: Fri, 28 Dec 2018 02:09:23 +0000 (+0800) Subject: add read message from kafka feature X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=221e8161aee6c0656c6ec361595f95c12e7b2303;p=tools%2Fdepanneur.git add read message from kafka feature Change-Id: I2740b87a44bf1f3b7baca29df013a6ec814bf7b0 --- diff --git a/depanneur b/depanneur index 697fe8e..21e75ee 100755 --- a/depanneur +++ b/depanneur @@ -79,6 +79,23 @@ use File::Basename; # "sudo -v" period use constant SUDOV_PERIOD => 3*60; use constant SC_NPROCESSORS_ONLN => 84; + +# use kafka to send message to other workers +use Kafka::Connection; +use Kafka::Producer; +use Kafka::Consumer; +use Kafka qw( + $DEFAULT_MAX_BYTES + $DEFAULT_MAX_NUMBER_OF_OFFSETS + $RECEIVE_EARLIEST_OFFSET +); +use Scalar::Util qw{ + blessed +}; +use Try::Tiny; + + + my @threads; # TODO: clean up my @exclude = (); # exclude build packages list my @repos= (); # rpm repositoies list @@ -192,6 +209,10 @@ my $reverse_on = 1; #enable reverse dependency my $enable_cluster = 0; # enable cluster building my $max_partitions = 3; # specify max partitions, which is more than or equal to workers my %packages_level = (); # save the level of packages, which are calculated by get_top_order() algorithm +my $connection; +my $connection1; +my $producer; +my $consumer; GetOptions ( "repository=s" => \@repos, "arch=s" => \$arch, @@ -2518,20 +2539,46 @@ sub dir_wanted { $prune = 1; } } -# use kafka to send message to other workers -use Kafka::Producer; -my $connection = Kafka::Connection->new( host => '109.123.100.144' ); -my $producer = Kafka::Producer->new( Connection => $connection ); sub writeToKafka { my $cur_level = shift; my $partition = 0; - my @fail_packages; foreach my $package (@{$packages_level{$cur_level}}) { $producer->send('tizen-unified',$partition,"$package"); $partition = ($partition+1)%($max_partitions); } - return @fail_packages; -} +} + +sub readFromKafka { + my $cur_level = shift; + my $partition = 0; + my @fail_packages; + my @succeed_packages; + my $cur_offset = 0; + my $packageNum = @{$packages_level{$cur_level}}; + while(@fail_packages + @succeed_packages < $packageNum ) { + my $messages = $consumer->fetch( + 'tizen-unified-status', + $partition, + $cur_offset, + $DEFAULT_MAX_BYTES + ); + foreach my $message ( @$messages ) { + if ( $message->valid ) { + $cur_offset = $message->next_offset; + if($message->payload eq "failed") { + push(@fail_packages, $message->key); + } else { + push(@succeed_packages, $message->key); + } + } else { + info($message->error); + } + } + debug("wait for next 0.5s to check"); + sleep(1); # sleep 0.5s for next check + } + return @fail_packages; +} # MAIN if ($depends) { info("start generate packages depends from: " . $package_path . " ($style)"); @@ -2816,20 +2863,41 @@ for my $pkg (`find "$srpm_repo_path" -type f -name "*.rpm" 2>/dev/null`) { } # if build with cluster, not build local if ($enable_cluster == 1) { + try { + $connection = Kafka::Connection->new( host => '109.123.100.144' ); + $producer = Kafka::Producer->new( Connection => $connection ); + $consumer = Kafka::Consumer->new( Connection => $connection ); + } catch { + my $error = $_; + if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) { + warn 'Error: (', $error->code, ') ', $error->message, "\n"; + exit; + } else { + die $error; + } + }; + my $cur_level = 0; my $ret = 0; if($debug) { print_level_packages(); } + my @fail_packages; + # when a level packages building return status are all succeed, + # then go to next level packages. while(defined $packages_level{$cur_level}) { - my @fail_packages = writeToKafka($cur_level); - if(@fail_packages > 0) { - while(@fail_packages) { - print "$_ "; + writeToKafka($cur_level); + @fail_packages = readFromKafka($cur_level); + if( @fail_packages ) { + foreach my $p ( @fail_packages ) { + print "$p "; } print "\n"; error("these @fail_packages packages build failed"); } + else { + info("@{$packages_level{$cur_level}} packages build succeed"); + } $cur_level++; } exit $ret;