Steve's Nametag

Server Steve, One Script to Rule Them All.

Background

I have a lot of servers spread out, some at work and some at home, and I want to manage all of these servers from one place. I created Steve to accomplish many of the day-to-day tasks I need to do on my servers. Steve will check for updates, install updates, restart, make an apt proxy, and remove the proxy. Steve saves me hours in the month and keeps me from remembering IP addresses.

Steve uses Paramiko for the vast majority of his commands. Paramiko is an SSH class for python that allows your code to talk to devices over SSH. If it has SSH, your python can now send it commands. I use Steve/Paramiko to manage servers running Ubuntu, Fedora, CentOS, and even my RaspberryPi boards. Paramiko adds a whole lot of functionality to python.

Steve will, at some point, need a name change because I will be adding the code to back up, deploy, and manage switches. SHH Steve? For the time being, Server Steve is the aptest name for him/it.

Saving Configs

I wrote steve so I could add servers to a config file and never need to remember any detail about the server. The server config stores the IP, username, password, online status, number of updates, and when each server was last seen. I use this in unison with Tailscale to easily manage work servers from home and home servers from work.

Future

Steve manages my Linux servers now, but I want to add the functionality to manage my windows server in the future. Steve already knows what distro branch each server is and what each use be it apt of DNF to stay up to date. Expanding Server Steve to windows will be trivial because of the branch detection, but I haven’t researched the ability of python to integrate with windows. Linux now has PowerShell, so that may be the route I choose. I have written a few programs in Powershell, and talking to servers through WinRM on Linux may be the way to go.

Code

#!/usr/bin/python3

import paramiko
# import select
import sys
import configparser
import os
import logging
import platform
import datetime
from os.path import exists

x = str(datetime.datetime.now())
logging.basicConfig(filename='logs/output.'+x+'.log', encoding='utf-8', level=logging.INFO)

platform = platform.system()
clearscreen = 0

if clearscreen == 1:
    os.system('clear')

print(platform)
configServers = configparser.ConfigParser()
configServers.read('configs/servers.ini')
hosts = configServers.sections()

dnf_full = ["dnf update -y"]
apt_full = ["apt update","apt upgrade -y","apt autoremove -y","apt update"]
apt_update = ["apt update"]
dnf_stats = ["dnf install sysstat -y","mpstat"]
apt_stats = ["apt update","apt install sysstat -y","mpstat"]
apt_up_avail = "apt list --upgradable"
apt_proxy_path = "/etc/apt/apt.conf.d/00AptProxy"
apt_proxy_address = "http://XXX.XXX.XXX.XXX:3142/"
apt_proxy_block = "echo \'Acquire::http::Proxy \""+apt_proxy_address+"\";\' | sudo tee -S "+apt_proxy_path
apt_proxy = ['touch '+apt_proxy_path,'rm '+apt_proxy_path,'touch '+apt_proxy_path,apt_proxy_block]
#apt_proxy_perms = 'chomd 644 '+apt_proxy_path
apt_proxy_rm = ['rm '+apt_proxy_path]

data = {}

def init():
    global hosts
    for host in hosts:
        h = configServers[host]
        check_ping(h["Name"],h["ip"])
    if clearscreen == 1:
        os.system('clear')

def menu():
    global hosts
    x = int()
    y = int()
    #print(platform)
    print("***** Rob's Python Server Manager *****")
    print(f'Servers Loaded: {hosts}')
    print("Name        IP       Updates       Online")
    for host in hosts:
        h = configServers[host]
        print(h['name']+"        "+h['ip']+"       "+h['updates']+"       "+h['status'])
    print('Select a task:')
    print('1:    Update package proxy')
    print('11:   Remove package proxy')
    print('2:    Update apt caches')
    print('3:    Upgrade all servers')
    print('4:    Restart a server')
    print('5:    Server usage')
    print('6:    Add a server')
    print('0:    Exit')
    option = int(input("Selection: "))
    if(option == 3):
        if clearscreen == 1:
            os.system('clear')
        fullUpgrade()
    elif(option == 1):
        if clearscreen == 1:
            os.system('clear')
        proxy()
    elif(option == 11):
        if clearscreen == 1:
            os.system('clear')
        remove_proxy()
    elif(option == 2):
        if clearscreen == 1:
            os.system('clear')
        fullUpdate()
    elif(option == 4):
        if clearscreen == 1:
            os.system('clear')
        restart()
    elif(option == 5):
        if clearscreen == 1:
            os.system('clear')
        stats()
    elif(option == 6):
        if clearscreen == 1:
            os.system('clear')
        addServer()
    elif(option == 0):
        if clearscreen == 1:
            os.system('clear')
        updateConfig()
        print("Thanks for using Rob's Python Server Manager")
        print("Copyright \u00a9 2022 Rob Babel [email protected]")
        sys.exit()

def updateConfig():
    with open('servers.ini', 'w') as configfile:
            configServers.write(configfile)
    hosts = configServers.sections()

def check_ping(Name,server):
    if platform == 'Windows':
        response = True if os.system("ping -n 1 -w 1 " + server.strip(";")) == 1 else False
    else:
        response = False if os.system("ping -c 1 " + server.strip(";")) == 0 else True
    # and then check the response...
    if response == 0:
        x = datetime.datetime.now()
        configServers[Name]['status'] = '1'
        configServers[Name]['onlinetime'] = str(x)
    else:
        configServers[Name]['status'] = '0'
    updateConfig()
    return configServers[Name]['status']

def write_file(host,inputfile,data):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(host['ip'], username=host['user'], password=host['pass'])

    ftp = ssh.open_sftp()
    file = ftp.file(inputfile, "a", -1)
    file.write(data)
    file.flush()
    ftp.close()
    ssh.close()

def OutToConsole(Message):
    Message = Message.replace("\n", "")
    if Message != "" and Message !="\n":
        print(Message)

def TalkToServer(host,commands,jobid="None"):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host["IP"], username=host["User"], password=host["Pass"])
    if isinstance(commands, list):
        for command in commands:
            command = "sudo -S -p '' %s" % command
            logging.info("Job[%s]: Executing: %s" % (jobid, command))
            #print(command)
            stdin, stdout, stderr = client.exec_command(command)
            stdin.write(host["Pass"]+ "\n")
            stdin.flush()
            stdoutput = [line for line in stdout]
            stderroutput = [line for line in stderr]
            for output in stdoutput:
                if apt_up_avail in output:
                    temp = output.split(" ")
                    configServers[host["name"]]["updates"] = temp[0]
                    updateConfig()
                logging.info("Job[%s]: %s" % (jobid, output.strip()))
            # Check exit code.
            logging.debug("Job[%s]:stdout: %s" % (jobid, stdoutput))
            logging.debug("Job[%s]:stderror: %s" % (jobid, stderroutput))
            logging.info("Job[%s]:Command status: %s" % (jobid, stdout.channel.recv_exit_status()))
            if not stdout.channel.recv_exit_status():
                logging.info("Job[%s]: Command executed." % jobid)
                client.close()
                if not stdoutput:
                    stdoutput = True
                return True, stdoutput
            else:
                logging.error("Job[%s]: Command failed." % jobid)
                for output in stderroutput:
                    logging.error("Job[%s]: %s" % (jobid, output))
                client.close()
                return False, stderroutput
    else:
        #print(commands)
        commands = "sudo -S -p '' %s" % commands
        logging.info("Job[%s]: Executing: %s" % (jobid, commands))
        stdin, stdout, stderr = client.exec_command(commands)
        stdin.write(host["Pass"]+ "\n")
        stdin.flush()
        stdoutput = [line for line in stdout]
        stderroutput = [line for line in stderr]
        for output in stdoutput:
            if apt_up_avail in output:
                temp = output.split(" ")
                configServers[host["name"]]["updates"] = temp[0]
                updateConfig()
            logging.info("Job[%s]: %s" % (jobid, output.strip()))
        # Check exit code.
        logging.debug("Job[%s]:stdout: %s" % (jobid, stdoutput))
        logging.debug("Job[%s]:stderror: %s" % (jobid, stderroutput))
        logging.info("Job[%s]:Command status: %s" % (jobid, stdout.channel.recv_exit_status()))
        if not stdout.channel.recv_exit_status():
            logging.info("Job[%s]: Command executed." % jobid)
            client.close()
            if not stdoutput:
                stdoutput = True
            return True, stdoutput
        else:
            logging.error("Job[%s]: Command failed." % jobid)
            for output in stderroutput:
                logging.error("Job[%s]: %s" % (jobid, output))
            client.close()
            return False, stderroutput

def fullUpgrade():
    for host in hosts:
        h = configServers[host]
        if check_ping(h['Name'],h['ip']):
            print(h['name']+" - Doing full updates")
            logging.info(h['name']+" - Doing full updates")
            #if h["type"] == "dnf":
                #commands = dnf_full
            if h["type"] == "apt":
                commands = apt_full
                for command in commands:
                    TalkToServer(h,command)
    if clearscreen == 1:
        os.system('clear')

def fullUpdate():
    for host in hosts:
        h = configServers[host]
        if check_ping(h['Name'],h['ip']):
            if h["Type"] == "apt":
                print(h['name']+" - Checking for updates")
                logging.info(h['name']+" - Checking for updates")
                commands = apt_update
                for command in commands:
                    TalkToServer(h,command,h['name']+" - Checking for updates")
    if clearscreen == 1:
        os.system('clear')

def restart():
    x = int()
    y = int()
    for host in hosts:
        h = configServers[host]
        if h['status'] == 1:
            x += 1
            print(str(x)+"   - "+host)
    x += 1
    print(str(x)+"   - Cancel")    
    target = int(input("Restart: "))
    for host in hosts:
        h = configServers[host]
        if h['status'] == 1:
            y += 1
            if target == y:
    #           TalkToServer(h,"reboot now")
                restart()
    if clearscreen == 1:
        os.system('clear')

def stats():
    x = int()
    y = int()
    for host in hosts:
        h = configServers[host]
        if check_ping(h['Name'],h['ip']):
            x += 1
            print(str(x)+"   - "+host)
    x += 1
    print(str(x)+"   - Cancel")    
    target = int(input("Stats: "))
    for host in hosts:
        h = configServers[host]
        if check_ping(h['Name'],h['ip']):
            y += 1
            if target == y:
                if h["Type"] == "apt":
                    command = apt_stats
                if h["Type"] == "dnf":
                    command = dnf_stats
    #           TalkToServer(h,command)
                if x != y:
                    input("Press Enter to continue...")
                    stats()
    if clearscreen == 1:
        os.system('clear')

def proxy():
    for host in hosts:
        h = configServers[host]
        if check_ping(h['Name'],h['ip']):
            if h['status'] == '1':
                if h["Type"] == "apt":
                    print(h['name']+" - checking for proxy file")
                    logging.info(h['name']+" - checking for proxy file")
                    commands = apt_proxy
                    for command in commands:
                        TalkToServer(h,command)
                    #write_file(h,"/steve/00AptProxy","Acquire::http::Proxy "+apt_proxy_address+";")
                    #write_file(h,"/steve/00AptProxy","Acquire::https::Proxy "+apt_proxy_address+";")
                    TalkToServer(h,"mv /steve/00AptProxy "+apt_proxy_path)
    if clearscreen == 1:
        os.system('clear')

def remove_proxy():
    for host in hosts:
        h = configServers[host]
        if check_ping(h['Name'],h['ip']):
            if h['status'] == '1':
                if h["Type"] == "apt":
                    commands = apt_proxy_rm
                    for command in commands:
                        print(h['name']+" - removing proxy file")
                        logging.info(h['name']+" - removing proxy file")
                        TalkToServer(h,command)
    if clearscreen == 1:
        os.system('clear')

def addServer():
    global hosts
    Name = input("Name    : ")
    IPAdd = input("IP      : ")
    User = input("Username: ")
    Pass = input("Password: ")
    Type = input("apt/dnf : ")
    configServers.add_section(Name)
    configServers[Name]['name'] = Name
    configServers[Name]['ip'] = IPAdd
    configServers[Name]['user'] = User
    configServers[Name]['pass'] = Pass
    configServers[Name]['type'] = Type
    check_ping(Name,IPAdd)
    updateConfig()
    if clearscreen == 1:
        os.system('clear')
    
init()
while True:
    menu()

Leave a Reply

Your email address will not be published. Required fields are marked *