As mentioned on BoF at DebConf, I started working on tests for our cloud images. For now I've come with attached scripts: * test_cloud.py should be run from personal machine; it'll start instance, run tests, and terminate instance * test_image.py is copied by test_cloud.py to running instance, will run there, and test some more As alioth is being decommissioned, I don't know where to create git repository for them. I'm hesistant to upload them to GitHub (after their change of EULA) - any other ideas? This is real work-in-progress. First I started with unit-testing, but wanted to have more control (at least in the beginning) so it is just Python with functions. Scripts don't even accept parameters; you just need to change variables at the beginning of test_cloud.py. I run those on our official AWS images and: * we can log-in as root * there is additional port opened, besides for 53 and 22 * there are logs from FAI left This goes against some of the points from last sprint: https://wiki.debian.org/Sprints/2016/DebianCloudNov2016?action=AttachFi le&do=view&target=TestIdeas.txt e.g. point 5 (no login for root), 9 (no unexpected opened ports), 18 (no build-instance logs). I created functions for most of the points in mentioned list. At the same time - some points are repeated there (5 and 12), (0 and 8). We probably I shall work on some more tests; my vision for now is to add some code (e.g. log and users checking just prints existing users, their groups, shell, etc. Similarly for sources.list). I also intend to change test_cloud.py to class-based approach, to have common code and to allow for intheritance to deal with different cloud providers. I'm still not sure if some variant of unit-testing makes sense here. It could lead to simplifying code (especilly when we have more tests) but I'm not sure if this is worth losing of control e.g. of order of execution of tests. Last remarks. As some of you probably noticed, I changed my GPG key. Debian Keyring contains my old key signed by new key, and I also uploaded my new key to public keyrings. At the sprint I'll ask for signatures and then I'll upload my new key to Debian keyring. Best regards. -- Tomasz Rybak, Debian Maintainer GPG: A565 CE64 F866 A258 4DDC F9C7 ECB7 3E37 E887 AA8C
#! /usr/bin/python3 import boto3 import paramiko import sys AMI = "ami-ea4fe285" REGION = "eu-central-1" PROFILE = "serpent" KEY = "serpent" KEY_FILE = "DebianAWS.pem" SG = "SSH" USERNAME = "admin" def start_image(ec2_connection, name): instances = ec2_connection.create_instances( ImageId=name, MinCount=1, MaxCount=1, KeyName=KEY, SecurityGroups=[SG], InstanceType="t2.micro") ec2_client = ec2_connection.meta.client waiter = ec2_client.get_waiter('instance_running') waiter.wait() instance = instances[0] return instance def connect_to_machine(instance, username): connection = paramiko.SSHClient() connection.set_missing_host_key_policy(paramiko.client.AutoAddPolicy()) ip = instance.public_ip_address connection.connect(ip, username=username) # connection.connect(ip, key_filename=KEY_FILE, username=USERNAME) return connection def disconnect_from_machine(connection): connection.close() def stop_image(instance): instance.terminate() def test_command(connection, command): channel = connection.get_transport().open_session() channel.exec_command(command) result = channel.recv_exit_status() print(result) channel.close() return result == 0 # TODO: check password login def test_root_login(instance): connection = paramiko.SSHClient() connection.set_missing_host_key_policy(paramiko.client.AutoAddPolicy()) ip = instance.public_ip_address try: connection.connect(ip, username="root") print(connection) return False except paramiko.ssh_exception.AuthenticationException: return True def test_open_ports(connection): stdin, stdout, stderr = connection.exec_command("sudo netstat -nlutp| tr -s ' ' | cut -d ' ' -f4") ports = set() lines = stdout.read().decode().splitlines() for line in lines[2:]: port = line.split(':')[-1] print(port) ports.add(port) print(ports) print(stderr.read()) return ports def copy_image_test_code(connection, file_name): transport = connection.get_transport() sftp = paramiko.SFTPClient.from_transport(transport) # TODO: test it without FTP on machine sftp.put(file_name, file_name) sftp.close() connection.exec_command("chmod 0700 {0}".format(file_name)) def run_image_test_code(connection, file_name): stdin, stdout, stderr = connection.exec_command("sudo ./{0}".format(file_name)) print("Stdout:") output = stdout.read().decode() for line in output.splitlines(): print(line) errors = stderr.read() if len(errors) > 0: print("Stderr:") print(errors) return False return True def test_sudo(connection): try: pass return True except: return False def test_apt(connection): try: stdin, stdout, stderr = connection.exec_command("sudo apt update") print(stdout.read()) print(stderr.read()) return True except: return False def test_image_startup(image): raise NotImplementedError("No idea how to do it right now") def check_sources_list(connection): pass def test_users(connection): pass def test_cloud_init(connection): pass def test_temporary_directories(connection): pass def test_log_directories(connection): pass def test_network_drivers(connection): raise NotImplementedError("No idea how to do it right now. Provider specific") def test_filesystems(connection): pass def test_size(image): return True if __name__ == '__main__': session = boto3.session.Session(region_name=REGION, profile_name=PROFILE) ec2_connection = session.resource("ec2") # Test image startup try: instance = start_image(ec2_connection, AMI) except: print("Got problems starting image") sys.exit(1) # instance = list(ec2_connection.instances.all())[0] connection = connect_to_machine(instance, USERNAME) if test_command(connection, "sudo apt update"): print("We can update with apt") else: print("Something is wrong") if not test_command(connection, "apt update"): print("We cannot update without sudo") else: print("Something is wrong. We can update without sudo") if test_command(connection, "sudo apt upgrade -y"): print("System upgraded with apt") else: print("Something is wrong") if test_root_login(instance): print("root login disallowed") else: print("Strange! we can login as root") ports = test_open_ports(connection) print("Open ports: {0}".format(ports)) non_standard_ports = ports.difference(set(['22', '68'])) if len(non_standard_ports) > 0: print("We have some non-standard opened ports: {0}".format(non_standard_ports)) # stdin, stdout, stderr = connection.exec_command("sudo apt update") # print(stdout.read()) # print(stderr.read()) copy_image_test_code(connection, "test_image.py") if not run_image_test_code(connection, "test_image.py"): print("We had problems with running test script on instance") disconnect_from_machine(connection) stop_image(instance)
#! /usr/bin/python3 import pathlib import pwd import apt import aptsources import aptsources.sourceslist def test_open_ports(connection): stdin, stdout, stderr = connection.exec_command("sudo netstat -nlutp| tr -s ' ' | cut -d ' ' -f4") s = stdout.read() a = s.decode() lines = a.splitlines() ports = set() for line in lines[2:]: port = line.split(':')[-1] print(port) ports.add(port) print(ports) print(stderr.read()) return ports def test_sudo(connection): try: pass return True except: return False def test_apt(connection): try: stdin, stdout, stderr = connection.exec_command("sudo apt update") print(stdout.read()) print(stderr.read()) return True except: return False def check_sources_list(): urls = set() distributions = set() components = set() apt_sources = aptsources.sourceslist.SourcesList() for entry in apt_sources: if not entry.disabled and entry.uri: urls.add(entry.uri) distributions.add(entry.dist) components = components.union(set(entry.comps)) print("URLs: {0}".format(urls)) print("Distributions: {0}".format(distributions)) print("Components: {0}".format(components)) def test_users(): names = set() shells = set() uids = set() gids = set() for entry in pwd.getpwall(): names.add(entry[0]) uids.add(entry[2]) gids.add(entry[3]) shells.add(entry[6]) print(entry) print("Users: {0}".format(names)) print("uids: {0}".format(uids)) print("gids: {0}".format(gids)) print("Shells: {0}".format(shells)) def test_cloud_init(): cloud_init_run = pathlib.Path("/var/run/cloud-init") cloud_init_lib = pathlib.Path("/var/lib/cloud-init") cloud_init_logs = pathlib.Path("/var/log/cloud-init") for child in cloud_init_lib.iterdir(): if child.is_file(): print(child) # TODO: check for errors for child in cloud_init_logs.iterdir(): if child.is_file(): print(child) # TODO: check for status and results for child in cloud_init_run.iterdir(): if child.is_file(): print(child) def check_file(directory): temp_file_name = "temporary" temp_file_path = directory / temp_file_name content = "Ala ma kota" with temp_file_path.open("w") as file: file.write(content) with temp_file_path.open("r") as file: read_content = file.read() if content != read_content: print("Something is wrong") temp_file_path.unlink() def test_temporary_directories(): check_file(pathlib.Path("/tmp")) check_file(pathlib.Path("/var/tmp")) def check_log(directory): for child in directory.iterdir(): if child.is_file(): print("File: {0}".format(child)) else: check_log(child) def test_log_directories(): # TODO: check for errors check_log(pathlib.Path("/var/log")) # TODO: check for any non-emtpy files? check_log(pathlib.Path("/var/log/fai")) def test_network_drivers(): raise NotImplementedError("No idea how to do it right now. Provider specific") def test_filesystems(): check_file(pathlib.Path("~").expanduser()) def test_installation(): package_name = "git" cache = apt.Cache() package = cache[package_name] package.mark_install() cache.commit() if __name__ == '__main__': test_installation() print("Packages installation tested") test_filesystems() print("Home directory R/W") test_temporary_directories() print("Temporary directories R/W") test_log_directories() print("Log directories OK") test_users() print("We have some users ;-)") try: test_cloud_init() print("Cloud init is") except: print("Problems with cloud init") check_sources_list() print("apt sources lists")
Attachment:
signature.asc
Description: This is a digitally signed message part