python修改paramiko模块开发运维审计堡垒机
分类:
IT文章
•
2024-03-20 07:40:49
1. 第三方paramiko模块下载:https://github.com/paramiko/paramiko
2. paramiko源码包演示:运行demo.py文件远程登陆

3. 通过修改paramiko源码,主要是修改demo.py文件和interactive.py开发运维审计堡垒机
4. 堡垒机原理
堡垒机有一个公共账号,所有人都是通过这个公共账号登陆堡垒机,且只能用于登陆,不能做其他操作;root用户可以做任何操作;
公共账号登陆后,选择主机组,选择主机,选择你的用户,可以使用密码登陆;也可以使用密钥登陆,登陆后的操作记录历史到文件中。
5. 目录和文件介绍

6. 修改interactive.py文件
import socket
import sys
from paramiko.py3compat import u
import time
# windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False
def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan)
def posix_shell(chan):
import select
oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
records = []
f = open('/root/tmp/record_comm/record.txt','ab+')
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = u(chan.recv(1024))
if len(x) == 0:
sys.stdout.write('
*** EOF
')
break
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r: #屏幕接收
x = sys.stdin.read(1) #每次只接收一个字符
records.append(x)
if x == '
':
c_time = time.strftime('%Y-%m-%d %H:%M:%S')
cmd = ''.join(records).replace('
','
')
# log = ('%s %s' % (c_time,cmd)).encode()
# print('your input:',log)
log = ('%s %s' % (c_time,cmd)).encode()
f.write(log)
records = []
if len(x) == 0:
break
chan.send(x)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
f.close()
# thanks to Mike Looijmans for this code
def windows_shell(chan):
import threading
sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.
")
def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write('
*** EOF ***
')
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(chan,))
writer.start()
try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass
interactive.py
7. 编写zj.py脚本,选择登陆的主机组和主机
import os
def hosts_group():
os.system('clear')
print('【主机组列表】1.qqandroid组 2.qqios组')
global x_1
x_1 = input('请输入你要选择的主机组编号:')
def hosts():
global lines
if x_1 == '1':
os.system('clear')
print('【qqandroid组主机列表】 1.192.168.244.128 2.192.168.244.122 3.返回上一页')
x_21 = input('请输入你要登陆的主机编号:')
f = open('../hosts_user/qqandroid.txt')
for line in f.readlines():
line = line.strip('
').split(' ')
if line[0] == x_21:
lines = line[1]
if x_21 == '3':
os.system('/usr/bin/python /tmp/script/demo.py')
else:
os.system('clear')
print('【qqios组主机列表】 1.192.168.244.128 2.192.168.244.123 3.返回上一页')
x_22 = input('请输入你要登陆的主机编号:')
f = open('/root/tmp/hosts_user/qqios.txt')
for line in f.readlines():
line = line.strip('
').split(' ')
if line[0] == x_22:
lines = line[1]
if x_22 == '3':
os.system('/usr/bin/python /tmp/script/demo.py')
def yh():
os.system('clear')
print('【用户列表】1.root 2.test 3.test1 4.返回上一页')
xyh = input('请输入你要登陆的主机用户编号:')
global youruser
f = open('/root/tmp/hosts_user/users.txt')
for line in f.readlines():
line = line.strip('
').split(' ')
if line[0] == xyh:
youruser = line[1]
if xyh == '4':
os.system('/usr/bin/python /tmp/script/demo.py')
zj.py
#!/usr/bin/env python
# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
import base64
from binascii import hexlify
import getpass
import os
import select
import socket
import sys
import time
import traceback
from paramiko.py3compat import input
import zj
import paramiko
try:
import interactive
except ImportError:
from . import interactive
def agent_auth(transport, username):
"""
Attempt to authenticate to the given transport using any of the private
keys available from an SSH agent.
"""
agent = paramiko.Agent()
agent_keys = agent.get_keys()
if len(agent_keys) == 0:
return
for key in agent_keys:
print('Trying ssh-agent key %s' % hexlify(key.get_fingerprint()))
try:
transport.auth_publickey(username, key)
print('... success!')
return
except paramiko.SSHException:
print('... nope.')
def manual_auth(username, hostname):
default_auth = 'p'
auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth)
if len(auth) == 0:
auth = default_auth
if auth == 'r':
# default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
default_path = '/root/.ssh/id_rsa'
path = input('RSA key [%s]: ' % default_path)
# path = '/root/.ssh/id_rsa'
if len(path) == 0:
path = default_path
try:
key = paramiko.RSAKey.from_private_key_file(path)
# zj.hosts_group()
# zj.hosts()
# zj.yh()
# username = 'root'
except paramiko.PasswordRequiredException:
password = getpass.getpass('RSA key password: ')
key = paramiko.RSAKey.from_private_key_file(path, password)
t.auth_publickey(username, key)
elif auth == 'd':
default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa')
path = input('DSS key [%s]: ' % default_path)
if len(path) == 0:
path = default_path
try:
key = paramiko.DSSKey.from_private_key_file(path)
except paramiko.PasswordRequiredException:
password = getpass.getpass('DSS key password: ')
key = paramiko.DSSKey.from_private_key_file(path, password)
t.auth_publickey(username, key)
else:
pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
t.auth_password(username, pw)
# setup logging
paramiko.util.log_to_file('demo.log')
username = ''
if len(sys.argv) > 1:
hostname = sys.argv[1]
if hostname.find('@') >= 0:
username, hostname = hostname.split('@')
else:
# hostname = input('Hostname: ')
zj.hosts_group()
zj.hosts()
hostname = zj.lines
if len(hostname) == 0:
print('*** Hostname required.')
sys.exit(1)
port = 22
if hostname.find(':') >= 0:
hostname, portstr = hostname.split(':')
port = int(portstr)
# now connect
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((hostname, port))
except Exception as e:
print('*** Connect failed: ' + str(e))
traceback.print_exc()
sys.exit(1)
try:
t = paramiko.Transport(sock)
try:
t.start_client()
except paramiko.SSHException:
print('*** SSH negotiation failed.')
sys.exit(1)
try:
keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))
except IOError:
try:
keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts'))
except IOError:
print('*** Unable to open host keys file')
keys = {}
# check server's host key -- this is important.
key = t.get_remote_server_key()
if hostname not in keys:
print('*** WARNING: Unknown host key!')
elif key.get_name() not in keys[hostname]:
print('*** WARNING: Unknown host key!')
elif keys[hostname][key.get_name()] != key:
print('*** WARNING: Host key has changed!!!')
sys.exit(1)
else:
print('*** Host key OK.')
# get username
if username == '':
default_username = getpass.getuser()
# username = input('Username [%s]: ' % default_username)
zj.yh()
username = zj.youruser
if len(username) == 0:
username = default_username
agent_auth(t, username)
if not t.is_authenticated():
manual_auth(username, hostname)
if not t.is_authenticated():
print('*** Authentication failed. :(')
t.close()
sys.exit(1)
chan = t.open_session()
chan.get_pty()
chan.invoke_shell()
print('*** Here we go!
')
f = open('/root/tmp/record_comm/record.txt','ab+')
sj = time.strftime('%c')
f.write(('
').encode())
f.write((username + ' ' + 'login:' +' ' + sj + ' ' + 'from' + ' '+'192.168.244.129' + '
').encode())
f.close()
interactive.interactive_shell(chan)
f = open('record.txt','ab+')
f.write((username+' ' + 'exit:'+' '+sj +'
').encode())
f.close()
chan.close()
t.close()
except Exception as e:
print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e))
traceback.print_exc()
try:
t.close()
except:
pass
sys.exit(1)