add read message from kafka feature
authorjingui.ren <jingui.ren@samsung.com>
Fri, 28 Dec 2018 02:09:23 +0000 (10:09 +0800)
committerjingui.ren <jingui.ren@samsung.com>
Fri, 28 Dec 2018 02:09:23 +0000 (10:09 +0800)
Change-Id: I2740b87a44bf1f3b7baca29df013a6ec814bf7b0

depanneur

index 697fe8e5c37b6dc0c262a71c09e5729a5037b683..21e75eea777896c65ce4b3af6fe44dd9a46c1ee9 100755 (executable)
--- a/depanneur
+++ b/depanneur
@@ -79,6 +79,23 @@ use File::Basename;
 # "sudo -v" period
 use constant SUDOV_PERIOD => 3*60;
 use constant SC_NPROCESSORS_ONLN => 84;
+
+# use kafka to send message to other workers
+use Kafka::Connection;
+use Kafka::Producer;
+use Kafka::Consumer;
+use Kafka qw(
+    $DEFAULT_MAX_BYTES
+    $DEFAULT_MAX_NUMBER_OF_OFFSETS
+    $RECEIVE_EARLIEST_OFFSET
+);
+use Scalar::Util qw{
+    blessed
+};
+use Try::Tiny;
+
+
+
 my @threads;                    # TODO: clean up
 my @exclude = ();               # exclude build packages list
 my @repos= ();                  # rpm repositoies list
@@ -192,6 +209,10 @@ my $reverse_on = 1; #enable reverse dependency
 my $enable_cluster = 0; # enable cluster building
 my $max_partitions = 3; # specify max partitions, which is more than or equal to workers
 my %packages_level = (); # save the level of packages, which are calculated by get_top_order() algorithm
+my $connection;
+my $connection1;
+my $producer; 
+my $consumer;
 GetOptions (
     "repository=s" => \@repos,
     "arch=s" => \$arch,
@@ -2518,20 +2539,46 @@ sub dir_wanted {
                $prune = 1;
        }
 }
-# use kafka to send message to other workers
-use Kafka::Producer;
-my $connection = Kafka::Connection->new( host => '109.123.100.144' );
-my $producer = Kafka::Producer->new( Connection => $connection );
 sub writeToKafka {
     my $cur_level = shift;
     my $partition = 0;
-    my @fail_packages;
     foreach my $package (@{$packages_level{$cur_level}}) {
         $producer->send('tizen-unified',$partition,"$package");
                $partition = ($partition+1)%($max_partitions);
        }
-       return @fail_packages;
-} 
+}
+
+sub readFromKafka {
+    my $cur_level = shift;
+    my $partition = 0;
+    my @fail_packages;
+    my @succeed_packages;
+    my $cur_offset = 0;
+    my $packageNum = @{$packages_level{$cur_level}};
+    while(@fail_packages + @succeed_packages < $packageNum ) {
+        my $messages = $consumer->fetch(
+            'tizen-unified-status',
+            $partition,
+            $cur_offset,
+            $DEFAULT_MAX_BYTES
+        );
+        foreach my $message ( @$messages ) {
+            if ( $message->valid ) {
+                $cur_offset = $message->next_offset;
+                if($message->payload eq "failed") {
+                    push(@fail_packages, $message->key);
+                } else {
+                    push(@succeed_packages, $message->key);
+                }
+            } else {
+                info($message->error);
+            }
+        }
+        debug("wait for next 0.5s to check");
+        sleep(1); # sleep 0.5s for next check
+    }
+    return @fail_packages;
+}
 # MAIN
 if ($depends) {
     info("start generate packages depends from: " . $package_path . " ($style)");
@@ -2816,20 +2863,41 @@ for my $pkg (`find "$srpm_repo_path" -type f -name "*.rpm" 2>/dev/null`) {
 }
 # if build with cluster, not build local
 if ($enable_cluster == 1) {
+    try {
+        $connection = Kafka::Connection->new( host => '109.123.100.144' );
+        $producer = Kafka::Producer->new( Connection => $connection );
+        $consumer = Kafka::Consumer->new( Connection => $connection );
+    } catch {
+        my $error = $_;
+        if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
+            warn 'Error: (', $error->code, ') ',  $error->message, "\n";
+            exit;
+        } else {
+            die $error;
+        }
+    };
+
        my $cur_level = 0;
        my $ret = 0;
     if($debug) {
         print_level_packages();
     }
+    my @fail_packages;
+    # when a level packages building return status are all succeed,
+    # then go to next level packages.
        while(defined $packages_level{$cur_level}) {
-               my @fail_packages = writeToKafka($cur_level);
-               if(@fail_packages > 0) {
-                       while(@fail_packages) {
-                               print "$_ ";
+               writeToKafka($cur_level);
+        @fail_packages = readFromKafka($cur_level);
+               if( @fail_packages ) {
+                       foreach my $p ( @fail_packages ) {
+                               print "$p ";
             }
                        print "\n";
                        error("these @fail_packages packages build failed");
                }
+               else {
+                       info("@{$packages_level{$cur_level}} packages build succeed");
+               }
                $cur_level++;
        }
        exit $ret;