[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

[dak/master 3/4] i was trying to interrupt this with the keyboards and failing



---
 dak/import_known_changes.py |  114 ++++++++++++++++++++++++++++---------------
 daklib/changes.py           |    2 +-
 2 files changed, 75 insertions(+), 41 deletions(-)

diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py
index b403a80..5aa9a58 100755
--- a/dak/import_known_changes.py
+++ b/dak/import_known_changes.py
@@ -66,6 +66,8 @@ OPTIONS
     sys.exit(exit_code)
 
 def check_signature (sig_filename, data_filename=""):
+    fingerprint = None
+
     keyrings = [
         "/home/joerg/keyring/keyrings/debian-keyring.gpg",
         "/home/joerg/keyring/keyrings/debian-keyring.pgp",
@@ -127,10 +129,17 @@ class OneAtATime(object):
     def __init__(self):
         self.next_in_line = None
         self.next_lock = threading.Condition()
+        self.die = False
+
+    def plsDie(self):
+        self.die = True
+        self.next_lock.notify()
 
     def enqueue(self, next):
         self.next_lock.acquire()
         while self.next_in_line:
+            if self.die:
+                return
             self.next_lock.wait()
 
         assert( not self.next_in_line )
@@ -141,15 +150,19 @@ class OneAtATime(object):
     def dequeue(self):
         self.next_lock.acquire()
         while not self.next_in_line:
+            if self.die:
+                return
             self.next_lock.wait()
-        result = self.next_in_line
 
-        if isinstance(result, EndOfChanges):
-            return None
+        result = self.next_in_line
 
         self.next_in_line = None
         self.next_lock.notify()
         self.next_lock.release()
+
+        if isinstance(result, EndOfChanges):
+            return None
+
         return result
 
 class ChangesToImport(object):
@@ -164,10 +177,11 @@ class ChangesToImport(object):
 
 class ChangesGenerator(threading.Thread):
     """enqueues changes files to be imported"""
-    def __init__(self, queue):
+    def __init__(self, parent, queue):
         threading.Thread.__init__(self)
         self.queue = queue
         self.session = DBConn().session()
+        self.parent = parent
         self.die = False
 
     def plsDie(self):
@@ -185,26 +199,32 @@ class ChangesGenerator(threading.Thread):
                     if not filenames:
                         # Empty directory (or only subdirectories), next
                         continue
-                    if self.die:
-                        return
 
                     for changesfile in filenames:
-                        if not changesfile.endswith(".changes"):
-                            # Only interested in changes files.
-                            continue
-                        count += 1
-
-                        if not get_knownchange(changesfile, self.session):
-                            to_import = ChangesToImport(dirpath, changesfile, count)
-                            self.queue.enqueue(to_import)
+                        try:
+                            if not changesfile.endswith(".changes"):
+                                # Only interested in changes files.
+                                continue
+                            count += 1
+
+                            if not get_knownchange(changesfile, self.session):
+                                to_import = ChangesToImport(dirpath, changesfile, count)
+                                if self.die:
+                                    return
+                                self.queue.enqueue(to_import)
+                        except KeyboardInterrupt:
+                            print("got Ctrl-c in enqueue thread.  terminating")
+                            self.parent.plsDie()
+                            sys.exit(1)
 
         self.queue.enqueue(EndOfChanges())
 
 class ImportThread(threading.Thread):
-    def __init__(self, queue):
+    def __init__(self, parent, queue):
         threading.Thread.__init__(self)
         self.queue = queue
         self.session = DBConn().session()
+        self.parent = parent
         self.die = False
 
     def plsDie(self):
@@ -231,17 +251,49 @@ class ImportThread(threading.Thread):
 
             except InvalidDscError, line:
                 warn("syntax error in .dsc file '%s', line %s." % (f, line))
-#                failure += 1
 
             except ChangesUnicodeError:
                 warn("found invalid changes file, not properly utf-8 encoded")
-#                failure += 1
-
-                print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile)
-
 
+                
+            except KeyboardInterrupt:
+                print("Caught C-c; on ImportThread. terminating.")
+                self.parent.plsDie()
+                sys.exit(1)
+                print("STUSTUSTUSTUSTU")
+                return
             except:
                 traceback.print_exc()
+                self.parent.plsDie()
+                sys.exit(1)
+
+class ImportKnownChanges(object):
+    def __init__(self,num_threads):
+        self.queue = OneAtATime()
+        self.threads = [ ChangesGenerator(self,self.queue) ]
+
+        for i in range(num_threads):
+            self.threads.append( ImportThread(self,self.queue) )
+
+        try:
+            for thread in self.threads:
+                thread.start()
+
+        except KeyboardInterrupt:
+            print("Caught C-c; terminating.")
+            utils.warn("Caught C-c; terminating.")
+            self.plsDie()
+
+    def plsDie(self):
+        traceback.print_stack90
+        for thread in self.threads:
+            print( "STU: before ask %s to die" % thread )
+            thread.plsDie()
+            print( "STU: after ask %s to die" % thread )
+
+        self.threads=[]
+        sys.exit(1)
+        
 
 def main():
     cnf = Config()
@@ -277,28 +329,10 @@ def main():
     if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
         num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")])
 
+    ImportKnownChanges(num_threads)
 
-    queue = OneAtATime()
-    threads = [ ChangesGenerator(queue) ]
-
-    for i in range(num_threads):
-        threads.append( ImportThread(queue) )
-
-    try:
-        for thread in threads:
-            thread.start()
-
-        for thread in thrads:
-            thread.join()
-
-    except KeyboardInterrupt:
-        utils.warn("Caught C-c; terminating.")
-        for thread in threads:
-            thread.plsDie()
-
-        for thread in threads:
-            thread.join()
 
+        
 
 if __name__ == '__main__':
     main()
diff --git a/daklib/changes.py b/daklib/changes.py
index 90ce231..3eb842d 100755
--- a/daklib/changes.py
+++ b/daklib/changes.py
@@ -192,7 +192,7 @@ class Changes(object):
 
     def mark_missing_fields(self):
         """add "missing" in fields which we will require for the known_changes table"""
-        for key in ['urgency', 'maintainer', 'fingerprint', 'changedby' ]:
+        for key in ['urgency', 'maintainer', 'fingerprint', 'changed-by' ]:
             if (not self.changes.has_key(key)) or (not self.changes[key]):
                 self.changes[key]='missing'
 
-- 
1.6.3.3



Reply to: