Make 'repo sync -jN' exit with an error code in the case of sync errors.

The bug that this is fixing is described here:

http://code.google.com/p/chromium-os/issues/detail?id=6813

This fix allows the helper threads to signal the main thread that they
saw an error.  When the main thread sees the error, it will let all
existing threads finish, then exit with an error.

Change-Id: If3019bc6b0b3ab9304d49ed2eea53e9d57f3095a
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 36ef16d..eac0556 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -39,6 +39,10 @@
 from project import SyncBuffer
 from progress import Progress
 
+class _FetchError(Exception):
+  """Internal error thrown in _FetchHelper() when we don't want stack trace."""
+  pass
+
 class Sync(Command, MirrorSafeCommand):
   jobs = 1
   common = True
@@ -135,20 +139,60 @@
                  dest='repo_upgraded', action='store_true',
                  help=SUPPRESS_HELP)
 
-  def _FetchHelper(self, opt, project, lock, fetched, pm, sem):
-      if not project.Sync_NetworkHalf(quiet=opt.quiet):
-        print >>sys.stderr, 'error: Cannot fetch %s' % project.name
-        if opt.force_broken:
-          print >>sys.stderr, 'warn: --force-broken, continuing to sync'
-        else:
-          sem.release()
-          sys.exit(1)
+  def _FetchHelper(self, opt, project, lock, fetched, pm, sem, err_event):
+      """Main function of the fetch threads when jobs are > 1.
+
+      Args:
+        opt: Program options returned from optparse.  See _Options().
+        project: Project object for the project to fetch.
+        lock: Lock for accessing objects that are shared amongst multiple
+            _FetchHelper() threads.
+        fetched: set object that we will add project.gitdir to when we're done
+            (with our lock held).
+        pm: Instance of a Project object.  We will call pm.update() (with our
+            lock held).
+        sem: We'll release() this semaphore when we exit so that another thread
+            can be started up.
+        err_event: We'll set this event in the case of an error (after printing
+            out info about the error).
+      """
+      # We'll set to true once we've locked the lock.
+      did_lock = False
+
+      # Encapsulate everything in a try/except/finally so that:
+      # - We always set err_event in the case of an exception.
+      # - We always make sure we call sem.release().
+      # - We always make sure we unlock the lock if we locked it.
+      try:
+        success = project.Sync_NetworkHalf(quiet=opt.quiet)
+
+        # Lock around all the rest of the code, since printing, updating a set
+        # and Progress.update() are not thread safe.
+        lock.acquire()
+        did_lock = True
+
+        if not success:
+          print >>sys.stderr, 'error: Cannot fetch %s' % project.name
+          if opt.force_broken:
+            print >>sys.stderr, 'warn: --force-broken, continuing to sync'
+          else:
+            raise _FetchError()
 
-      lock.acquire()
-      fetched.add(project.gitdir)
-      pm.update()
-      lock.release()
-      sem.release()
+        fetched.add(project.gitdir)
+        pm.update()
+      except BaseException, e:
+        # Notify the _Fetch() function about all errors.
+        err_event.set()
+
+        # If we got our own _FetchError, we don't want a stack trace.
+        # However, if we got something else (something in Sync_NetworkHalf?),
+        # we'd like one (so re-raise after we've set err_event).
+        if not isinstance(e, _FetchError):
+          raise
+      finally:
+        if did_lock:
+          lock.release()
+        sem.release()
 
   def _Fetch(self, projects, opt):
     fetched = set()
@@ -169,7 +213,13 @@
       threads = set()
       lock = _threading.Lock()
       sem = _threading.Semaphore(self.jobs)
+      err_event = _threading.Event()
       for project in projects:
+        # Check for any errors before starting any new threads.
+        # ...we'll let existing threads finish, though.
+        if err_event.is_set():
+          break
+
         sem.acquire()
         t = _threading.Thread(target = self._FetchHelper,
                               args = (opt,
@@ -177,13 +227,19 @@
                                       lock,
                                       fetched,
                                       pm,
-                                      sem))
+                                      sem,
+                                      err_event))
         threads.add(t)
         t.start()
 
       for t in threads:
         t.join()
 
+      # If we saw an error, exit with code 1 so that other scripts can check.
+      if err_event.is_set():
+        print >>sys.stderr, '\nerror: Exited sync due to fetch errors'
+        sys.exit(1)
+
     pm.end()
     for project in projects:
       project.bare_git.gc('--auto')