[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

[dak/master 01/10] daklib/gpg.py: small library for PGP-signed files



Signed-off-by: Ansgar Burchardt <ansgar@debian.org>
---
 daklib/gpg.py |  187 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 187 insertions(+), 0 deletions(-)
 create mode 100644 daklib/gpg.py

diff --git a/daklib/gpg.py b/daklib/gpg.py
new file mode 100644
index 0000000..5c396ec
--- /dev/null
+++ b/daklib/gpg.py
@@ -0,0 +1,187 @@
+"""Utilities for signed files
+
+@contact: Debian FTP Master <ftpmaster@debian.org>
+@copyright: 2011  Ansgar Burchardt <ansgar@debian.org>
+@license: GNU General Public License version 2 or later
+"""
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+import errno
+import fcntl
+import os
+import select
+import sys
+
+try:
+    _MAXFD = os.sysconf("SC_OPEN_MAX")
+except:
+    _MAXFD = 256
+
+class GpgException(Exception):
+    pass
+
+class _Pipe(object):
+    """context manager for pipes
+
+    Note: When the pipe is closed by other means than the close_r and close_w
+    methods, you have to set self.r (self.w) to None.
+    """
+    def __enter__(self):
+        (self.r, self.w) = os.pipe()
+        return self
+    def __exit__(self, type, value, traceback):
+        self.close_w()
+        self.close_r()
+        return False
+    def close_r(self):
+        """close reading side of the pipe"""
+        if self.r:
+            os.close(self.r)
+            self.r = None
+    def close_w(self):
+        """close writing part of the pipe"""
+        if self.w:
+            os.close(self.w)
+            self.w = None
+
+class SignedFile(object):
+    """handle files signed with PGP
+
+    The following attributes are available:
+      contents            - string with the content (after removing PGP armor)
+      valid               - Boolean indicating a valid signature was found
+      fingerprint         - fingerprint of the key used for signing
+      primary_fingerprint - fingerprint of the primary key associated to the key used for signing
+    """
+    def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
+        """
+        @param data: string containing the message
+        @param keyrings: seqeuence of keyrings
+        @param require_signature: if True (the default), will raise an exception if no valid signature was found
+        @param gpg: location of the gpg binary
+        """
+        self.gpg = gpg
+        self.keyrings = keyrings
+
+        self.valid = False
+        self.fingerprint = None
+        self.primary_fingerprint = None
+
+        self._verify(data, require_signature)
+
+    def _verify(self, data, require_signature):
+        with _Pipe() as stdin:
+         with _Pipe() as contents:
+          with _Pipe() as status:
+            pid = os.fork()
+            if pid == 0:
+                self._exec_gpg(stdin.r, contents.w, sys.stderr.fileno(), status.w)
+            else:
+                stdin.close_r()
+                contents.close_w()
+                status.close_w()
+
+                read = self._do_io([contents.r, status.r], {stdin.w: data})
+                stdin.w = None # was closed by _do_io
+
+                (pid_, exit_code, usage_) = os.wait4(pid, 0)
+
+                self.contents = read[contents.r]
+                self.status   = read[status.r]
+
+                if self.status == "":
+                    raise GpgException("No status output from GPG. (GPG exited with status code %s)" % exit_code)
+
+                for line in self.status.splitlines():
+                    self._parse_status(line)
+
+                if require_signature and not self.valid:
+                    raise GpgException("No valid signature found. (GPG exited with status code %s)" % exit_code)
+
+    def _do_io(self, read, write):
+        read_lines = dict( (fd, []) for fd in read )
+        write_pos = dict( (fd, 0) for fd in write )
+
+        read_set = list(read)
+        write_set = write.keys()
+        while len(read_set) > 0 or len(write_set) > 0:
+            r, w, x_ = select.select(read_set, write_set, ())
+            for fd in r:
+                data = os.read(fd, 4096)
+                if data == "":
+                    read_set.remove(fd)
+                read_lines[fd].append(data)
+            for fd in w:
+                data = write[fd][write_pos[fd]:]
+                if data == "":
+                    os.close(fd)
+                    write_set.remove(fd)
+                else:
+                    bytes_written = os.write(fd, data)
+                    write_pos[fd] += bytes_written
+
+        return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
+
+    def _parse_status(self, line):
+        fields = line.split()
+        if fields[0] != "[GNUPG:]":
+            raise GpgException("Unexpected output on status-fd: %s" % line)
+
+        # VALIDSIG    <fingerprint in hex> <sig_creation_date> <sig-timestamp>
+        #             <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
+        #             <hash-algo> <sig-class> <primary-key-fpr>
+        if fields[1] == "VALIDSIG":
+            self.valid = True
+            self.fingerprint = fields[2]
+            self.primary_fingerprint = fields[11]
+
+        if fields[1] == "BADARMOR":
+            raise GpgException("Bad armor.")
+
+        if fields[1] == "NODATA":
+            raise GpgException("No data.")
+
+        if fields[1] == "DECRYPTION_FAILED":
+            raise GpgException("Decryption failed.")
+
+        if fields[1] == "ERROR":
+            raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
+
+    def _exec_gpg(self, stdin, stdout, stderr, statusfd):
+        try:
+            if stdin != 0:
+                os.dup2(stdin, 0)
+            if stdout != 1:
+                os.dup2(stdout, 1)
+            if stderr != 2:
+                os.dup2(stderr, 2)
+            if statusfd != 3:
+                os.dup2(statusfd, 3)
+            for fd in range(4):
+                old = fcntl.fcntl(fd, fcntl.F_GETFD)
+                fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
+            os.closerange(4, _MAXFD)
+
+            args = [self.gpg, "--status-fd=3", "--no-default-keyring"]
+            for k in self.keyrings:
+                args.append("--keyring=%s" % k)
+            args.extend(["--decrypt", "-"])
+
+            os.execvp(self.gpg, args)
+        finally:
+            sys.exit(1)
+
+# vim: set sw=4 et:
-- 
1.7.2.5



Reply to: