[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

Testing scripts



As mentioned on BoF at DebConf, I started working on tests for
our cloud images.
For now I've come with attached scripts:
 * test_cloud.py should be run from personal machine; it'll
start instance, run tests, and terminate instance
 * test_image.py is copied by test_cloud.py to running instance,
will run there, and test some more

As alioth is being decommissioned, I don't know where to create
git repository for them. I'm hesistant to upload them to GitHub
(after their change of EULA) - any other ideas?

This is real work-in-progress. First I started with unit-testing,
but wanted to have more control (at least in the beginning) so it
is just Python with functions. Scripts don't even accept parameters;
you just need to change variables at the beginning of test_cloud.py.

I run those on our official AWS images and:
 * we can log-in as root
 * there is additional port opened, besides for 53 and 22
 * there are logs from FAI left
This goes against some of the points from last sprint:
https://wiki.debian.org/Sprints/2016/DebianCloudNov2016?action=AttachFi
le&do=view&target=TestIdeas.txt
e.g. point 5 (no login for root), 9 (no unexpected opened ports),
18 (no build-instance logs).
I created functions for most of the points in mentioned list.
At the same time - some points are repeated there (5 and 12),
(0 and 8). We probably 

I shall work on some more tests; my vision for now is to add
some code (e.g. log and users checking just prints existing users,
their groups, shell, etc. Similarly for sources.list). I also intend to
change test_cloud.py to class-based approach, to have common code and
to allow for intheritance to deal with different cloud providers.
I'm still not sure if some variant of unit-testing makes sense here.
It could lead to simplifying code (especilly when we have more tests)
but I'm not sure if this is worth losing of control e.g. of order of
execution of tests.

Last remarks. As some of you probably noticed, I changed my GPG key.
Debian Keyring contains my old key signed by new key, and I also
uploaded my new key to public keyrings. At the sprint I'll ask
for signatures and then I'll upload my new key to Debian keyring.

Best regards.

-- 
Tomasz Rybak, Debian Maintainer
GPG: A565 CE64 F866 A258 4DDC F9C7 ECB7 3E37 E887 AA8C
#! /usr/bin/python3

import boto3
import paramiko
import sys

AMI = "ami-ea4fe285"
REGION = "eu-central-1"
PROFILE = "serpent"
KEY = "serpent"
KEY_FILE = "DebianAWS.pem"
SG = "SSH"
USERNAME = "admin"


def start_image(ec2_connection, name):
    instances = ec2_connection.create_instances(
        ImageId=name,
        MinCount=1,
        MaxCount=1,
        KeyName=KEY,
        SecurityGroups=[SG],
        InstanceType="t2.micro")
    ec2_client = ec2_connection.meta.client
    waiter = ec2_client.get_waiter('instance_running')
    waiter.wait()
    instance = instances[0]
    return instance


def connect_to_machine(instance, username):
    connection = paramiko.SSHClient()
    connection.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
    ip = instance.public_ip_address
    connection.connect(ip, username=username)
    # connection.connect(ip, key_filename=KEY_FILE, username=USERNAME)
    return connection


def disconnect_from_machine(connection):
    connection.close()


def stop_image(instance):
    instance.terminate()


def test_command(connection, command):
    channel = connection.get_transport().open_session()
    channel.exec_command(command)
    result = channel.recv_exit_status()
    print(result)
    channel.close()
    return result == 0


# TODO: check password login
def test_root_login(instance):
    connection = paramiko.SSHClient()
    connection.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
    ip = instance.public_ip_address
    try:
        connection.connect(ip, username="root")
        print(connection)
        return False
    except paramiko.ssh_exception.AuthenticationException:
        return True


def test_open_ports(connection):
    stdin, stdout, stderr = connection.exec_command("sudo netstat -nlutp| tr -s ' ' | cut -d ' ' -f4")
    ports = set()
    lines = stdout.read().decode().splitlines()
    for line in lines[2:]:
        port = line.split(':')[-1]
        print(port)
        ports.add(port)
    print(ports)
    print(stderr.read())
    return ports


def copy_image_test_code(connection, file_name):
    transport = connection.get_transport()
    sftp = paramiko.SFTPClient.from_transport(transport)
    # TODO: test it without FTP on machine
    sftp.put(file_name, file_name)
    sftp.close()
    connection.exec_command("chmod 0700 {0}".format(file_name))


def run_image_test_code(connection, file_name):
    stdin, stdout, stderr = connection.exec_command("sudo ./{0}".format(file_name))
    print("Stdout:")
    output = stdout.read().decode()
    for line in output.splitlines():
        print(line)
    errors = stderr.read()
    if len(errors) > 0:
        print("Stderr:")
        print(errors)
        return False
    return True


def test_sudo(connection):
    try:
        pass
        return True
    except:
        return False


def test_apt(connection):
    try:
        stdin, stdout, stderr = connection.exec_command("sudo apt update")
        print(stdout.read())
        print(stderr.read())
        return True
    except:
        return False


def test_image_startup(image):
    raise NotImplementedError("No idea how to do it right now")


def check_sources_list(connection):
    pass


def test_users(connection):
    pass


def test_cloud_init(connection):
    pass


def test_temporary_directories(connection):
    pass


def test_log_directories(connection):
    pass


def test_network_drivers(connection):
    raise NotImplementedError("No idea how to do it right now. Provider specific")


def test_filesystems(connection):
    pass


def test_size(image):
    return True


if __name__ == '__main__':
    session = boto3.session.Session(region_name=REGION, profile_name=PROFILE)
    ec2_connection = session.resource("ec2")

    # Test image startup
    try:
        instance = start_image(ec2_connection, AMI)
    except:
        print("Got problems starting image")
        sys.exit(1)
    # instance = list(ec2_connection.instances.all())[0]

    connection = connect_to_machine(instance, USERNAME)

    if test_command(connection, "sudo apt update"):
        print("We can update with apt")
    else:
        print("Something is wrong")

    if not test_command(connection, "apt update"):
        print("We cannot update without sudo")
    else:
        print("Something is wrong. We can update without sudo")

    if test_command(connection, "sudo apt upgrade -y"):
        print("System upgraded with apt")
    else:
        print("Something is wrong")

    if test_root_login(instance):
        print("root login disallowed")
    else:
        print("Strange! we can login as root")

    ports = test_open_ports(connection)
    print("Open ports: {0}".format(ports))
    non_standard_ports = ports.difference(set(['22', '68']))
    if len(non_standard_ports) > 0:
        print("We have some non-standard opened ports: {0}".format(non_standard_ports))

    # stdin, stdout, stderr = connection.exec_command("sudo apt update")
    # print(stdout.read())
    # print(stderr.read())

    copy_image_test_code(connection, "test_image.py")
    if not run_image_test_code(connection, "test_image.py"):
        print("We had problems with running test script on instance")

    disconnect_from_machine(connection)
    stop_image(instance)
#! /usr/bin/python3

import pathlib
import pwd

import apt
import aptsources
import aptsources.sourceslist


def test_open_ports(connection):
    stdin, stdout, stderr = connection.exec_command("sudo netstat -nlutp| tr -s ' ' | cut -d ' ' -f4")
    s = stdout.read()
    a = s.decode()
    lines = a.splitlines()
    ports = set()
    for line in lines[2:]:
        port = line.split(':')[-1]
        print(port)
        ports.add(port)
    print(ports)
    print(stderr.read())
    return ports


def test_sudo(connection):
    try:
        pass
        return True
    except:
        return False


def test_apt(connection):
    try:
        stdin, stdout, stderr = connection.exec_command("sudo apt update")
        print(stdout.read())
        print(stderr.read())
        return True
    except:
        return False


def check_sources_list():
    urls = set()
    distributions = set()
    components = set()
    apt_sources = aptsources.sourceslist.SourcesList()
    for entry in apt_sources:
        if not entry.disabled and entry.uri:
            urls.add(entry.uri)
            distributions.add(entry.dist)
            components = components.union(set(entry.comps))
    print("URLs: {0}".format(urls))
    print("Distributions: {0}".format(distributions))
    print("Components: {0}".format(components))


def test_users():
    names = set()
    shells = set()
    uids = set()
    gids = set()
    for entry in pwd.getpwall():
        names.add(entry[0])
        uids.add(entry[2])
        gids.add(entry[3])
        shells.add(entry[6])
        print(entry)
    print("Users: {0}".format(names))
    print("uids: {0}".format(uids))
    print("gids: {0}".format(gids))
    print("Shells: {0}".format(shells))


def test_cloud_init():
    cloud_init_run = pathlib.Path("/var/run/cloud-init")
    cloud_init_lib = pathlib.Path("/var/lib/cloud-init")
    cloud_init_logs = pathlib.Path("/var/log/cloud-init")
    for child in cloud_init_lib.iterdir():
        if child.is_file():
            print(child)
    # TODO: check for errors
    for child in cloud_init_logs.iterdir():
        if child.is_file():
            print(child)
    # TODO: check for status and results
    for child in cloud_init_run.iterdir():
        if child.is_file():
            print(child)


def check_file(directory):
    temp_file_name = "temporary"
    temp_file_path = directory / temp_file_name
    content = "Ala ma kota"
    with temp_file_path.open("w") as file:
        file.write(content)
    with temp_file_path.open("r") as file:
        read_content = file.read()
    if content != read_content:
        print("Something is wrong")
    temp_file_path.unlink()


def test_temporary_directories():
    check_file(pathlib.Path("/tmp"))
    check_file(pathlib.Path("/var/tmp"))


def check_log(directory):
    for child in directory.iterdir():
        if child.is_file():
            print("File: {0}".format(child))
        else:
            check_log(child)


def test_log_directories():
    # TODO: check for errors
    check_log(pathlib.Path("/var/log"))
    # TODO: check for any non-emtpy files?
    check_log(pathlib.Path("/var/log/fai"))


def test_network_drivers():
    raise NotImplementedError("No idea how to do it right now. Provider specific")


def test_filesystems():
    check_file(pathlib.Path("~").expanduser())


def test_installation():
    package_name = "git"
    cache = apt.Cache()
    package = cache[package_name]
    package.mark_install()
    cache.commit()


if __name__ == '__main__':
    test_installation()
    print("Packages installation tested")
    test_filesystems()
    print("Home directory R/W")
    test_temporary_directories()
    print("Temporary directories R/W")
    test_log_directories()
    print("Log directories OK")
    test_users()
    print("We have some users ;-)")
    try:
        test_cloud_init()
        print("Cloud init is")
    except:
        print("Problems with cloud init")
    check_sources_list()
    print("apt sources lists")

Attachment: signature.asc
Description: This is a digitally signed message part


Reply to: