Source code for copr_keygen.logic

import traceback
import os
import re
import logging

from subprocess import PIPE, Popen
import tempfile
import sys

from .exceptions import GpgErrorException, KeygenServiceBaseException
from .gpg import gpg_cmd

log = logging.getLogger(__name__)


[docs] def get_passphrase_location(app, name_email): return os.path.join(app.config["PHRASES_DIR"], name_email)
[docs] def ensure_passphrase_exist(app, name_email): """ Need this to tell signd server that `name_email` available in keyring Key not protected by passphrase, so we write *something* to passphrase file. """ def create(): with open(location, "w") as handle: handle.write("1") handle.write(os.linesep) log.debug("created passphrase file for {}".format(name_email)) location = get_passphrase_location(app, name_email) try: with open(location) as handle: content = handle.read() if not content: create() except IOError: create()
[docs] def validate_name_email(name_email): """ We read get a value from clients, that looks like this frostyx#foo@copr.fedorahosted.org @copr#foo@copr.fedorahosted.org This function returns `True` if the `name_value` is in an expected format. """ if not "#" in name_email: return False name, email = name_email.lstrip("@").split("#", 1) if not re.match(r"^[\w.-]+$", name): return False # This obviously doesn't validate the correct email format. The regex for # validating email is too hardcore (see `wtforms.validators.Email`) and we # don't care anyway. We only want to make sure that no ilegal characters # were used. if not re.match(r"^[-\w.@]+$", email): return False if not email.count("@") == 1: return False return True
[docs] def user_exists(app, mail): """ Checks if the user identified by mail presents in keyring :return: bool True when user present :raises: GpgErrorException """ cmd = gpg_cmd + ["--armor", "--batch", "--export", "<{0}>".format(mail)] try: handle = Popen(cmd, stdout=PIPE, stderr=PIPE) stdout, stderr = handle.communicate() except Exception as e: log.exception(e) raise GpgErrorException(msg="unhandled exception during gpg call", cmd=" ".join(cmd), err=e) if "BEGIN PGP PUBLIC KEY BLOCK" in stdout.decode("utf-8"): # TODO: validate that the key is ultimately trusted log.debug("user {} has keys in keyring".format(mail)) ensure_passphrase_exist(app, mail) return True elif "nothing exported" in stderr.decode("utf-8"): log.debug("user {} not found in keyring".format(mail)) return False else: err = GpgErrorException(msg="unhandled error", cmd=cmd, stdout=stdout.decode(), stderr=stderr.decode()) log.error(err) raise err
template = """ %no-protection Key-Type: {key_type} Key-Length: {key_length} Name-Real: {name_real} Name-Comment: {comment} Name-Email: {name_email} Expire-Date: {expire} %commit """
[docs] def create_new_key( app, name_real, name_email, key_length, expire=None, name_comment=None): """ Creates new key for user. WARNING! This method doesn't check for the key duplicity. :param app: Flask application object :param name_real: name for key identification :param name_email: email for key identification :param key_length: length of key in bytes, accepts 1024 or 2048 :param expire: [optional] days for key to expire, default 0 == never expire :param name_comment: [optional] comment for key :return: (stdout, stderr) from `gpg` invocation """ try: # ! Don't use context manager with delete=True # TemporaryFile deletes file on .close() not on __exit__() out = tempfile.NamedTemporaryFile(delete=False) except Exception as e: raise KeygenServiceBaseException( msg="Failed to create tmp file for gen_key", err=e) try: out.write(template.format( key_type="RSA", key_length=key_length or 2048, name_real=name_real, comment=name_comment, name_email=name_email, expire=expire or 0).encode('utf-8')) out.close() except Exception as e: raise GpgErrorException(msg="Failed to write tmp file for gen_key", err=e) cmd = gpg_cmd + ["--batch", "--gen-key", out.name] log.debug("CMD: {}".format(' '.join(map(str, cmd)))) try: handle = Popen(cmd, stdout=PIPE, stderr=PIPE) stdout, stderr = handle.communicate() except Exception as e: log.exception(e) err = GpgErrorException(msg="unhandled exception during gpg call", cmd=" ".join(map(str, cmd)), err=e) log.error(err) raise err log.info("returncode: {}".format(handle.returncode)) log.info("stdout: {}".format(stdout)) log.info("stderr: {}".format(stderr)) if handle.returncode == 0: # TODO: validate that we really got armored gpg key if not user_exists(app, name_email): raise GpgErrorException( msg="Key was created, but not found in keyring" "this shouldn't be possible") log.info("Created key-pair for: {} ".format(name_email)) else: raise GpgErrorException(msg=stderr.decode()) try: os.remove(out.name) except Exception as e: log.error(e)