[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

[dak/master 01/10] Enhance process pool implementation



Signed-off-by: Mark Hymers <mhy@debian.org>
---
 dak/generate_filelist.py          |   57 +++++++++++++++------------
 dak/generate_packages_sources2.py |   35 ++++++++++------
 dak/generate_releases.py          |   33 +++++----------
 dak/show_new.py                   |    4 +-
 daklib/dakmultiprocessing.py      |   77 ++++++++++++++++++++++++++++++------
 tests/test_multiprocessing.py     |   61 +++++++++++++++++++++++++++++
 6 files changed, 192 insertions(+), 75 deletions(-)
 create mode 100755 tests/test_multiprocessing.py

diff --git a/dak/generate_filelist.py b/dak/generate_filelist.py
index 2a566e0..d015b3e 100755
--- a/dak/generate_filelist.py
+++ b/dak/generate_filelist.py
@@ -39,11 +39,13 @@ Generate file lists for apt-ftparchive.
 from daklib.dbconn import *
 from daklib.config import Config
 from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
 import apt_pkg, os, stat, sys
 
 from daklib.lists import getSources, getBinaries, getArchAll
 
+EXIT_STATUS = 0
+
 def listPath(suite, component, architecture = None, type = None,
         incremental_mode = False):
     """returns full path to the list file"""
@@ -159,7 +161,7 @@ def main():
     Options = cnf.SubTree("Filelist::Options")
     if Options['Help']:
         usage()
-    #pool = Pool()
+    pool = DakProcessPool()
     query_suites = query_suites. \
         filter(Suite.suite_name.in_(utils.split_args(Options['Suite'])))
     query_components = query_components. \
@@ -167,8 +169,15 @@ def main():
     query_architectures = query_architectures. \
         filter(Architecture.arch_string.in_(utils.split_args(Options['Architecture'])))
 
-    def log(message):
-        Logger.log([message])
+    def parse_results(message):
+        # Split out into (code, msg)
+        code, msg = message
+        if code == PROC_STATUS_SUCCESS:
+            Logger.log([msg])
+        elif code == PROC_STATUS_SIGNALRAISED:
+            Logger.log(['E: Subprocess recieved signal ', msg])
+        else:
+            Logger.log(['E: ', msg])
 
     for suite in query_suites:
         suite_id = suite.suite_id
@@ -179,34 +188,32 @@ def main():
                 if architecture not in suite.architectures:
                     pass
                 elif architecture.arch_string == 'source':
-                    Logger.log([writeSourceList(suite_id, component_id, Options['Incremental'])])
-                    #pool.apply_async(writeSourceList,
-                    #    (suite_id, component_id, Options['Incremental']), callback=log)
+                    pool.apply_async(writeSourceList,
+                        (suite_id, component_id, Options['Incremental']), callback=parse_results)
                 elif architecture.arch_string == 'all':
-                    Logger.log([writeAllList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
-                    #pool.apply_async(writeAllList,
-                    #    (suite_id, component_id, architecture_id, 'deb',
-                    #        Options['Incremental']), callback=log)
-                    Logger.log([writeAllList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
-                    #pool.apply_async(writeAllList,
-                    #    (suite_id, component_id, architecture_id, 'udeb',
-                    #        Options['Incremental']), callback=log)
+                    pool.apply_async(writeAllList,
+                        (suite_id, component_id, architecture_id, 'deb',
+                            Options['Incremental']), callback=parse_results)
+                    pool.apply_async(writeAllList,
+                        (suite_id, component_id, architecture_id, 'udeb',
+                            Options['Incremental']), callback=parse_results)
                 else: # arch any
-                    Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
-                    #pool.apply_async(writeBinaryList,
-                    #    (suite_id, component_id, architecture_id, 'deb',
-                    #        Options['Incremental']), callback=log)
-                    Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
-                    #pool.apply_async(writeBinaryList,
-                    #    (suite_id, component_id, architecture_id, 'udeb',
-                    #        Options['Incremental']), callback=log)
-    #pool.close()
-    #pool.join()
+                    pool.apply_async(writeBinaryList,
+                        (suite_id, component_id, architecture_id, 'deb',
+                            Options['Incremental']), callback=parse_results)
+                    pool.apply_async(writeBinaryList,
+                        (suite_id, component_id, architecture_id, 'udeb',
+                            Options['Incremental']), callback=parse_results)
+    pool.close()
+    pool.join()
+
     # this script doesn't change the database
     session.close()
 
     Logger.close()
 
+    sys.exit(pool.overall_status())
+
 if __name__ == '__main__':
     main()
 
diff --git a/dak/generate_packages_sources2.py b/dak/generate_packages_sources2.py
index eea799f..a7efea2 100755
--- a/dak/generate_packages_sources2.py
+++ b/dak/generate_packages_sources2.py
@@ -31,11 +31,13 @@ Generate Packages/Sources files
 from daklib.dbconn import *
 from daklib.config import Config
 from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
 from daklib.filewriter import PackagesFileWriter, SourcesFileWriter
 
 import apt_pkg, os, stat, sys
 
+EXIT_STATUS = 0
+
 def usage():
     print """Usage: dak generate-packages-sources2 [OPTIONS]
 Generate the Packages/Sources files
@@ -263,28 +265,35 @@ def main():
 
     component_ids = [ c.component_id for c in session.query(Component).all() ]
 
-    def log(details):
-        logger.log(details)
-
-    #pool = Pool()
+    def parse_results(message):
+        # Split out into (code, msg)
+        code, msg = message
+        if code == PROC_STATUS_SUCCESS:
+            Logger.log([msg])
+        elif code == PROC_STATUS_SIGNALRAISED:
+            Logger.log(['E: Subprocess recieved signal ', msg])
+        else:
+            Logger.log(['E: ', msg])
+
+    pool = DakProcessPool()
     for s in suites:
         if s.untouchable and not force:
             utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name)
         for c in component_ids:
-            logger.log(generate_sources(s.suite_id, c))
-            #pool.apply_async(generate_sources, [s.suite_id, c], callback=log)
+            pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_result)
             for a in s.architectures:
-                logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb'))
-                #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log)
-                logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb'))
-                #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log)
+                pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_result)
+                pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_result)
+
+    pool.close()
+    pool.join()
 
-    #pool.close()
-    #pool.join()
     # this script doesn't change the database
     session.close()
 
     logger.close()
 
+    sys.exit(pool.output_status())
+
 if __name__ == '__main__':
     main()
diff --git a/dak/generate_releases.py b/dak/generate_releases.py
index 7f5a996..6c4fed0 100755
--- a/dak/generate_releases.py
+++ b/dak/generate_releases.py
@@ -40,7 +40,6 @@ import bz2
 import apt_pkg
 from tempfile import mkstemp, mkdtemp
 import commands
-from multiprocessing import Pool, TimeoutError
 from sqlalchemy.orm import object_session
 
 from daklib import utils, daklog
@@ -48,10 +47,10 @@ from daklib.regexes import re_gensubrelease, re_includeinrelease
 from daklib.dak_exceptions import *
 from daklib.dbconn import *
 from daklib.config import Config
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS
 
 ################################################################################
 Logger = None                  #: Our logging object
-results = []                   #: Results of the subprocesses
 
 ################################################################################
 
@@ -74,11 +73,6 @@ SUITE can be a space seperated list, e.g.
 
 ########################################################################
 
-def get_result(arg):
-    global results
-    if arg:
-        results.append(arg)
-
 def sign_release_dir(suite, dirname):
     cnf = Config()
 
@@ -290,7 +284,7 @@ class ReleaseWriter(object):
 
 
 def main ():
-    global Logger, results
+    global Logger
 
     cnf = Config()
 
@@ -325,10 +319,8 @@ def main ():
         suites = session.query(Suite).filter(Suite.untouchable == False).all()
 
     broken=[]
-    # For each given suite, run one process
-    results = []
 
-    pool = Pool()
+    pool = DakProcessPool()
 
     for s in suites:
         # Setup a multiprocessing Pool. As many workers as we have CPU cores.
@@ -344,12 +336,10 @@ def main ():
     pool.close()
     pool.join()
 
-    retcode = 0
+    retcode = p.overall_status()
 
-    if len(results) > 0:
-        Logger.log(['Release file generation broken: %s' % (results)])
-        print "Release file generation broken:\n", '\n'.join(results)
-        retcode = 1
+    if retcode > 0:
+        Logger.log(['Release file generation broken: %s' % (p.results)])
 
     Logger.close()
 
@@ -361,13 +351,12 @@ def generate_helper(suite_id):
     '''
     session = DBConn().session()
     suite = Suite.get(suite_id, session)
-    try:
-        rw = ReleaseWriter(suite)
-        rw.generate_release_files()
-    except Exception, e:
-        return str(e)
 
-    return
+    # We allow the process handler to catch and deal with any exceptions
+    rw = ReleaseWriter(suite)
+    rw.generate_release_files()
+
+    return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name)
 
 #######################################################################################
 
diff --git a/dak/show_new.py b/dak/show_new.py
index 84b4507..7396d3c 100755
--- a/dak/show_new.py
+++ b/dak/show_new.py
@@ -37,7 +37,7 @@ from daklib.regexes import re_source_ext
 from daklib.config import Config
 from daklib import daklog
 from daklib.changesutils import *
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool
 
 # Globals
 Cnf = None
@@ -250,7 +250,7 @@ def main():
 
     examine_package.use_html=1
 
-    pool = Pool()
+    pool = DakProcessPool()
     for changes_file in changes_files:
         changes_file = utils.validate_changes_file_arg(changes_file, 0)
         if not changes_file:
diff --git a/daklib/dakmultiprocessing.py b/daklib/dakmultiprocessing.py
index 8e66cfd..57152bf 100644
--- a/daklib/dakmultiprocessing.py
+++ b/daklib/dakmultiprocessing.py
@@ -25,32 +25,83 @@ multiprocessing for DAK
 
 ###############################################################################
 
-import multiprocessing
+from multiprocessing.pool import Pool
+from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGCHLD, SIGALRM
+
 import sqlalchemy.orm.session
 
+__all__ = []
+
+PROC_STATUS_SUCCESS      = 0  # Everything ok
+PROC_STATUS_EXCEPTION    = 1  # An exception was caught
+PROC_STATUS_SIGNALRAISED = 2  # A signal was generated
+PROC_STATUS_MISCFAILURE  = 3  # Process specific error; see message
+
+__all__.extend(['PROC_STATUS_SUCCESS',      'PROC_STATUS_EXCEPTION',
+                'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
+
+class SignalException(Exception):
+    def __init__(self, signum):
+        self.signum = signum
+
+    def __str__(self):
+        return "<SignalException: %d>" % self.signum
+
+__all__.append('SignalException')
+
+def signal_handler(signum, info):
+    raise SignalException(signum)
+
 def _func_wrapper(func, *args, **kwds):
+    # We need to handle signals to avoid hanging
+    signal(SIGHUP, signal_handler)
+    signal(SIGTERM, signal_handler)
+    signal(SIGPIPE, signal_handler)
+    signal(SIGCHLD, signal_handler)
+    signal(SIGALRM, signal_handler)
+
+    # We expect our callback function to return:
+    # (status, messages)
+    # Where:
+    #  status is one of PROC_STATUS_*
+    #  messages is a string used for logging
     try:
-        return func(*args, **kwds)
+        return (func(*args, **kwds))
+    except SignalException, e:
+        return (PROC_STATUS_SIGNALRAISED, e.signum)
+    except Exception, e:
+        return (PROC_STATUS_EXCEPTION, str(e))
     finally:
         # Make sure connections are closed. We might die otherwise.
         sqlalchemy.orm.session.Session.close_all()
 
-class Pool():
+
+class DakProcessPool(Pool):
     def __init__(self, *args, **kwds):
-        self.pool = multiprocessing.Pool(*args, **kwds)
+        Pool.__init__(self, *args, **kwds)
         self.results = []
+        self.int_results = []
 
     def apply_async(self, func, args=(), kwds={}, callback=None):
         wrapper_args = list(args)
         wrapper_args.insert(0, func)
-        self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback))
-
-    def close(self):
-        self.pool.close()
+        self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
 
     def join(self):
-        self.pool.join()
-        #for r in self.results:
-        #    # return values were already handled in the callbacks, but asking
-        #    # for them might raise exceptions which would otherwise be lost
-        #    r.get()
+        Pool.join(self)
+        for r in self.int_results:
+            # return values were already handled in the callbacks, but asking
+            # for them might raise exceptions which would otherwise be lost
+            self.results.append(r.get())
+
+    def overall_status(self):
+        # Return the highest of our status results
+        # This basically allows us to do sys.exit(overall_status()) and have us
+        # exit 0 if everything was good and non-zero if not
+        status = 0
+        for r in self.results:
+            if r[0] > status:
+                status = r[0]
+        return status
+
+__all__.append('DakProcessPool')
diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py
new file mode 100755
index 0000000..c67e51f
--- /dev/null
+++ b/tests/test_multiprocessing.py
@@ -0,0 +1,61 @@
+#!/usr/bin/python
+
+from base_test import DakTestCase
+
+from daklib.dakmultiprocessing import DakProcessPool, \
+                                      PROC_STATUS_SUCCESS,   PROC_STATUS_MISCFAILURE, \
+                                      PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED 
+import signal
+
+def test_function(num, num2):
+    from os import kill, getpid
+
+    if num == 1:
+        sigs = [signal.SIGTERM, signal.SIGCHLD, signal.SIGALRM, signal.SIGHUP]
+        kill(getpid(), sigs[num2])
+
+    if num2 == 3:
+        raise Exception('Test uncaught exception handling')
+
+    if num == 0 and num2 == 1:
+        return (PROC_STATUS_MISCFAILURE, 'Test custom error return')
+
+    return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2))
+
+class DakProcessPoolTestCase(DakTestCase):
+    def testPool(self):
+        def alarm_handler(signum, frame):
+            raise AssertionError('Timed out')
+
+        # Shouldn't take us more than 15 seconds to run this test
+        signal.signal(signal.SIGALRM, alarm_handler)
+        signal.alarm(15)
+
+        p = DakProcessPool()
+        for s in range(3):
+            for j in range(4):
+                p.apply_async(test_function, [s, j])
+
+        p.close()
+        p.join()
+
+        signal.alarm(0)
+        signal.signal(signal.SIGALRM, signal.SIG_DFL)
+
+        expected = [(PROC_STATUS_SUCCESS,      'blah, 0, 0'),
+                    (PROC_STATUS_MISCFAILURE,  'Test custom error return'),
+                    (PROC_STATUS_SUCCESS,      'blah, 0, 2'),
+                    (PROC_STATUS_EXCEPTION,    'Test uncaught exception handling'),
+                    (PROC_STATUS_SIGNALRAISED, 15),
+                    (PROC_STATUS_SIGNALRAISED, 17),
+                    (PROC_STATUS_SIGNALRAISED, 14),
+                    (PROC_STATUS_SIGNALRAISED, 1),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 0'),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 1'),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 2'),
+                    (PROC_STATUS_EXCEPTION,    'Test uncaught exception handling')]
+
+        self.assertEqual( len(p.results), len(expected) )
+
+        for r in range(len(p.results)):
+            self.assertEqual(p.results[r], expected[r])
-- 
1.7.2.5



Reply to: