[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

[dak/master] importing old changes files into knwon_changes with a separeate script, now allowing some fields to be 'missing'



---
 dak/dak.py                  |    2 +
 dak/import_known_changes.py |  275 +++++++++++++++++++++++++++++++++++++++++++
 daklib/changes.py           |   11 ++-
 3 files changed, 287 insertions(+), 1 deletions(-)
 create mode 100755 dak/import_known_changes.py

diff --git a/dak/dak.py b/dak/dak.py
index f9839ea..e424836 100755
--- a/dak/dak.py
+++ b/dak/dak.py
@@ -134,6 +134,8 @@ def init():
          "Generate statistics"),
         ("bts-categorize",
          "Categorize uncategorized bugs filed against ftp.debian.org"),
+        ("import-known-changes",
+         "import old changes files into known_changes table"),
         ("add-user",
          "Add a user to the archive"),
         ]
diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py
new file mode 100755
index 0000000..d95ad2c
--- /dev/null
+++ b/dak/import_known_changes.py
@@ -0,0 +1,275 @@
+#!/usr/bin/env python
+# coding=utf8
+
+"""
+Import known_changes files
+
+@contact: Debian FTP Master <ftpmaster@debian.org>
+@copyright: 2009  Mike O'Connor <stew@debian.org>
+@license: GNU General Public License version 2 or later
+"""
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+################################################################################
+
+
+################################################################################
+
+import sys
+import os
+import logging
+import threading
+from daklib.dbconn import DBConn,get_knownchange
+
+from daklib.config import Config
+
+# where in dak.conf all of our configuration will be stowed
+options_prefix = "KnownChanges"
+options_prefix = "%s::Options" % options_prefix
+
+log = logging.getLogger()
+
+################################################################################
+
+
+def usage (exit_code=0):
+    print """Usage: dak import-known-changes [options]
+
+OPTIONS
+     -j n
+        run with n threads concurrently
+
+     -v, --verbose
+        show verbose information messages
+
+     -q, --quiet
+        supress all output but errors
+
+"""
+    sys.exit(exit_code)
+
+def check_signature (sig_filename, data_filename=""):
+    keyrings = [
+        "/home/joerg/keyring/keyrings/debian-keyring.gpg",
+        "/home/joerg/keyring/keyrings/debian-keyring.pgp",
+        "/home/joerg/keyring/keyrings/debian-maintainers.gpg",
+        "/home/joerg/keyring/keyrings/debian-role-keys.gpg",
+        "/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
+        "/home/joerg/keyring/keyrings/emeritus-keyring.gpg",
+        "/home/joerg/keyring/keyrings/removed-keys.gpg",
+        "/home/joerg/keyring/keyrings/removed-keys.pgp"
+        ]
+
+    keyringargs = " ".join(["--keyring %s" % x for x in keyrings ])
+
+    # Build the command line
+    status_read, status_write = os.pipe()
+    cmd = "gpgv --status-fd %s %s %s" % (status_write, keyringargs, sig_filename)
+
+    # Invoke gpgv on the file
+    (output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write)
+
+    # Process the status-fd output
+    (keywords, internal_error) = process_gpgv_output(status)
+
+    # If we failed to parse the status-fd output, let's just whine and bail now
+    if internal_error:
+        warn("Couldn't parse signature")
+        return None
+
+    # usually one would check for bad things here. We, however, do not care.
+
+    # Next check gpgv exited with a zero return code
+    if exit_status:
+        warn("Couldn't parse signature")
+        return None
+
+    # Sanity check the good stuff we expect
+    if not keywords.has_key("VALIDSIG"):
+        warn("Couldn't parse signature")
+    else:
+        args = keywords["VALIDSIG"]
+        if len(args) < 1:
+            warn("Couldn't parse signature")
+        else:
+            fingerprint = args[0]
+
+    return fingerprint
+
+
+class EndOfChanges(object):
+    """something enqueued to signify the last change"""
+    pass
+
+
+class OneAtATime(object):
+    """
+    a one space queue which sits between multiple possible producers
+    and multiple possible consumers
+    """
+    def __init__(self):
+        self.next_in_line = None
+        self.next_lock = threading.Condition()
+
+    def enqueue(self, next):
+        self.next_lock.acquire()
+        while self.next_in_line:
+            self.next_lock.wait()
+
+        assert( not self.next_in_line )
+        self.next_in_line = next
+        self.next_lock.notify()
+        self.next_lock.release()
+
+    def dequeue(self):
+        self.next_lock.acquire()
+        while not self.next_in_line:
+            self.next_lock.wait()
+        result = self.next_in_line
+
+        if isinstance(next, EndOfChanges):
+            return None
+
+        self.next_in_line = None
+        self.next_lock.notify()
+        self.next_lock.release()
+        return result
+
+class ChangesToImport(object):
+    """A changes file to be enqueued to be processed"""
+    def __init__(self, queue, checkdir, changesfile, count):
+        self.queue = queue
+        self.checkdir = checkdir
+        self.changesfile = changesfile
+        self.count = count
+
+class ChangesGenerator(threading.Thread):
+    """enqueues changes files to be imported"""
+    def __init__(self, queue):
+        self.queue = queue
+        self.session = DBConn().session()
+
+    def run(self):
+        cnf = Config()
+        for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]:
+            checkdir = cnf["Dir::Queue::%s" % (directory) ]
+            if os.path.exists(checkdir):
+                print "Looking into %s" % (checkdir)
+
+                for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False):
+                    if not filenames:
+                        # Empty directory (or only subdirectories), next
+                        continue
+                    for changesfile in filenames:
+                        if not changesfile.endswith(".changes"):
+                            # Only interested in changes files.
+                            continue
+                            count += 1
+
+                            if not get_knownchange(session):
+                                self.queue.enqueue(ChangesToImport(directory, checkdir, changesfile, count))
+
+        self.queue.enqueue(EndOfChanges())
+
+class ImportThread(threading.Thread):
+    def __init__(self, queue):
+        self.queue = queue
+        self.session = DBConn().session()
+
+    def run(self):
+        while True:
+            try:
+                to_import = queue.dequeue()
+                if not to_import:
+                    return
+
+                print( "Directory %s, file %7d, failures %3d. (%s)" % (to_import.dirpath[-10:], to_import.count, failure, to_import.changesfile) )
+
+                changes = Changes()
+                changes.changes_file = to_import.changesfile
+                changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
+                changes.changes = parse_changes(changesfile, signing_rules=-1)
+                changes.changes["fingerprint"] = check_signature(changesfile)
+                changes.add_known_changes(to_import.queue, self.session)
+                self.session.commit()
+
+            except InvalidDscError, line:
+                warn("syntax error in .dsc file '%s', line %s." % (f, line))
+                failure += 1
+
+            except ChangesUnicodeError:
+                warn("found invalid changes file, not properly utf-8 encoded")
+                failure += 1
+
+                print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile)
+
+
+
+def main():
+    cnf = Config()
+
+    arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
+                 ('j',"concurrency", "%s::%s" % (options_prefix,"Concurrency"),"HasArg"),
+                 ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
+                 ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
+                ]
+
+    args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
+
+    num_threads = 1
+
+    if (len(args) < 1) or not commands.has_key(args[0]):
+        usage()
+
+    if cnf.has_key("%s::%s" % (options_prefix,"Help")):
+        usage()
+
+    level=logging.INFO
+    if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
+        level=logging.ERROR
+
+    elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
+        level=logging.DEBUG
+
+
+    logging.basicConfig( level=level,
+                         format='%(asctime)s %(levelname)s %(message)s',
+                         stream = sys.stderr )
+
+    if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
+        num_threads = int(Config()[ "%s::%s" %(options_prefix,"Suite")])
+
+
+    queue = OneAtATime()
+    ChangesGenerator(queue).start()
+
+    for i in range(num_threads):
+        ImportThread(queue).start()
+
+def which_suites(session):
+    """
+    return a list of suites to operate on
+    """
+    if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
+        suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
+    else:
+        suites = Config().SubTree("Suite").List()
+
+    return [get_suite(s.lower(), session) for s in suites]
+
+
+if __name__ == '__main__':
+    main()
diff --git a/daklib/changes.py b/daklib/changes.py
index c5ac64a..1bca6dc 100755
--- a/daklib/changes.py
+++ b/daklib/changes.py
@@ -189,9 +189,16 @@ class Changes(object):
             session.commit()
             session.close()
 
+
+    def mark_missing_fields(self):
+        """add "missing" in fields which we will require for the known_changes table"""
+        for key in ['urgency', 'maintainer', 'fingerprint', 'changedby' ]:
+            if (not self.changes.has_key(key)) or (not self.changes[key]):
+                self.changes[key]='missing'
+
     def add_known_changes(self, queue, session=None):
+        """add "missing" in fields which we will require for the known_changes table"""
         cnf = Config()
-
         if session is None:
             session = DBConn().session()
             privatetrans = True
@@ -200,6 +207,8 @@ class Changes(object):
         changesfile = os.path.join(dirpath, self.changes_file)
         filetime = datetime.datetime.fromtimestamp(os.path.getctime(changesfile))
 
+        self.mark_missing_fields()
+
         session.execute(
             """INSERT INTO known_changes
               (changesname, seen, source, binaries, architecture, version,
-- 
1.6.3.3


Reply to: