view hgext/gpg.py @ 1645:c6ffedc4f11b

add removed files to the changelog file list - this should allow better detection of removed file when walking in the history (like hg log) it doesn't help for the fast path of hg log where we only look at the filelog - users of the changelog file list shouldn't assume anymore that the file still exist (anyway it won't be found in the manifest like in 5ecf05541e11) - fix the tests (some hashes changed)
author Benoit Boissinot <benoit.boissinot@ens-lyon.org>
date Sun, 29 Jan 2006 08:38:31 +1300
parents ff339dd21976
children 0690d0f202e1
line wrap: on
line source

import os, tempfile, binascii, errno
from mercurial import util
from mercurial import node as hgnode

class gpg:
    def __init__(self, path, key=None):
        self.path = path
        self.key = (key and " --local-user \"%s\"" % key) or ""

    def sign(self, data):
        gpgcmd = "%s --sign --detach-sign%s" % (self.path, self.key)
        return util.filter(data, gpgcmd)

    def verify(self, data, sig):
        """ returns of the good and bad signatures"""
        try:
            fd, sigfile = tempfile.mkstemp(prefix="hggpgsig")
            fp = os.fdopen(fd, 'wb')
            fp.write(sig)
            fp.close()
            fd, datafile = tempfile.mkstemp(prefix="hggpgdata")
            fp = os.fdopen(fd, 'wb')
            fp.write(data)
            fp.close()
            gpgcmd = "%s --logger-fd 1 --status-fd 1 --verify \"%s\" \"%s\"" % (self.path, sigfile, datafile)
            #gpgcmd = "%s --status-fd 1 --verify \"%s\" \"%s\"" % (self.path, sigfile, datafile)
            ret = util.filter("", gpgcmd)
        except:
            for f in (sigfile, datafile):
                try:
                    if f: os.unlink(f)
                except: pass
            raise
        keys = []
        key, fingerprint = None, None
        err = ""
        for l in ret.splitlines():
            # see DETAILS in the gnupg documentation
            # filter the logger output
            if not l.startswith("[GNUPG:]"):
                continue
            l = l[9:]
            if l.startswith("ERRSIG"):
                err = "error while verifying signature"
                break
            elif l.startswith("VALIDSIG"):
                # fingerprint of the primary key
                fingerprint = l.split()[10]
            elif (l.startswith("GOODSIG") or
                  l.startswith("EXPSIG") or
                  l.startswith("EXPKEYSIG") or
                  l.startswith("BADSIG")):
                if key is not None:
                    keys.append(key + [fingerprint])
                key = l.split(" ", 2)
                fingerprint = None
        if err:
            return err, []
        if key is not None:
            keys.append(key + [fingerprint])
        return err, keys

def newgpg(ui, **opts):
    gpgpath = ui.config("gpg", "cmd", "gpg")
    gpgkey = opts.get('key')
    if not gpgkey:
        gpgkey = ui.config("gpg", "key", None)
    return gpg(gpgpath, gpgkey)

def check(ui, repo, rev):
    """verify all the signatures there may be for a particular revision"""
    mygpg = newgpg(ui)
    rev = repo.lookup(rev)
    hexrev = hgnode.hex(rev)
    keys = []

    def addsig(fn, ln, l):
        if not l: return
        n, v, sig = l.split(" ", 2)
        if n == hexrev:
            data = node2txt(repo, rev, v)
            sig = binascii.a2b_base64(sig)
            err, k = mygpg.verify(data, sig)
            if not err:
                keys.append((k, fn, ln))
            else:
                ui.warn("%s:%d %s\n" % (fn, ln , err))

    fl = repo.file(".hgsigs")
    h = fl.heads()
    h.reverse()
    # read the heads
    for r in h:
        ln = 1
        for l in fl.read(r).splitlines():
            addsig(".hgsigs|%s" % hgnode.short(r), ln, l)
            ln +=1
    try:
        # read local signatures
        ln = 1
        f = repo.opener("localsigs")
        for l in f:
            addsig("localsigs", ln, l)
            ln +=1
    except IOError:
        pass

    if not keys:
        ui.write("%s not signed\n" % hgnode.short(rev))
        return
    valid = []
    # warn for expired key and/or sigs
    for k, fn, ln in keys:
        prefix = "%s:%d" % (fn, ln)
        for key in k:
            if key[0] == "BADSIG":
                ui.write("%s Bad signature from \"%s\"\n" % (prefix, key[2]))
                continue
            if key[0] == "EXPSIG":
                ui.write("%s Note: Signature has expired"
                         " (signed by: \"%s\")\n" % (prefix, key[2]))
            elif key[0] == "EXPKEYSIG":
                ui.write("%s Note: This key has expired"
                         " (signed by: \"%s\")\n" % (prefix, key[2]))
            valid.append((key[1], key[2], key[3]))
    # print summary
    ui.write("%s is signed by:\n" % hgnode.short(rev))
    for keyid, user, fingerprint in valid:
        role = getrole(ui, fingerprint)
        ui.write("  %s (%s)\n" % (user, role))

def getrole(ui, fingerprint):
    return ui.config("gpg", fingerprint, "no role defined")

def sign(ui, repo, *revs, **opts):
    """add a signature for the current tip or a given revision"""
    mygpg = newgpg(ui, **opts)
    sigver = "0"
    sigmessage = ""
    if revs:
        nodes = [repo.lookup(n) for n in revs]
    else:
        nodes = [repo.changelog.tip()]

    for n in nodes:
        hexnode = hgnode.hex(n)
        ui.write("Signing %d:%s\n" % (repo.changelog.rev(n),
                                      hgnode.short(n)))
        # build data
        data = node2txt(repo, n, sigver)
        sig = mygpg.sign(data)
        if not sig:
            raise util.Abort("Error while signing")
        sig = binascii.b2a_base64(sig)
        sig = sig.replace("\n", "")
        sigmessage += "%s %s %s\n" % (hexnode, sigver, sig)

    # write it
    if opts['local']:
        repo.opener("localsigs", "ab").write(sigmessage)
        return

    for x in repo.changes():
        if ".hgsigs" in x and not opts["force"]:
            raise util.Abort("working copy of .hgsigs is changed "
                             "(please commit .hgsigs manually"
                             "or use --force)")

    repo.wfile(".hgsigs", "ab").write(sigmessage)

    if repo.dirstate.state(".hgsigs") == '?':
        repo.add([".hgsigs"])

    if opts["no_commit"]:
        return

    message = opts['message']
    if not message:
        message = "\n".join(["Added signature for changeset %s" % hgnode.hex(n)
                             for n in nodes])
    try:
        repo.commit([".hgsigs"], message, opts['user'], opts['date'])
    except ValueError, inst:
        raise util.Abort(str(inst))

def node2txt(repo, node, ver):
    """map a manifest into some text"""
    if ver == "0":
        return "%s\n" % hgnode.hex(node)
    else:
        util.Abort("unknown signature version")

cmdtable = {
    "sign":
        (sign,
         [('l', 'local', None, "make the signature local"),
          ('f', 'force', None, "sign even if the sigfile is modified"),
          ('', 'no-commit', None, "do not commit the sigfile after signing"),
          ('m', 'message', "", "commit message"),
          ('d', 'date', "", "date code"),
          ('u', 'user', "", "user"),
          ('k', 'key', "", "the key id to sign with")],
         "hg sign [OPTION]... REVISIONS"),
    "sigcheck": (check, [], 'hg sigcheck REVISION')
}