使用Paramiko实现远程主机的命令行操作指南
最编程
2024-02-10 09:15:32
...
什么是 「Paramiko」? Paramiko是一个Python实现的SSHv2协议的库,可以用于在远程服务器上执行命令、上传和下载文件等操作。它使用了加密算法,可以提供安全的远程访问。由于其简单易用的API和丰富的功能,Paramiko被广泛用于自动化运维和云计算等领域。
1 使用的目的
- 执行远程主机脚本。
- 发起接口测试以后查询系统日志。
2 Paramiko 安装
使用效果图
demo类
- 可以看到查询需要用到的信息,主要是ip,用户名,密码,端口号
- 这些我们可以配置到yaml文件里面维护
3 使用yaml文件配置主机连接信息
- 创建一个工具类
import re
import time
import urllib
from time import sleep
# 定义一个类,表示一台远端linux主机
import paramiko
from paramiko import util
class Linux(object):
# 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机
def __init__(self, ip, username, password,env ='默认环境',timeout=5):
self.ip = ip
self.username = username
self.password = password
self.timeout = timeout
# transport和chanel
self.trans = ''
self.chan = ''
# 链接失败的重试次数
self.try_times = 3
# 调用该方法连接远程主机
def connect(self):
while True:
# 连接过程中可能会抛出异常,比如网络不通、链接超时
try:
self.trans = paramiko.Transport(sock=(self.ip, 22))
self.trans.set_log_channel('paramiko')
util.log_to_file('test.log')
self.trans.connect(username=self.username, password=self.password)
self.chan = self.trans.open_session()
self.chan.settimeout(self.timeout)
self.chan.get_pty()
self.chan.invoke_shell()
# 如果没有抛出异常说明连接成功,直接返回
print(u'连接%s成功' % self.ip)
# 接收到的网络数据解码为str
print(self.chan.recv(65535).decode('utf-8'))
return
# 这里不对可能的异常如socket.error, socket.timeout细化,直接一网打尽
except Exception:
if self.try_times != 0:
print(u'连接%s失败,进行重试' % self.ip)
self.try_times -= 1
else:
print(u'重试3次失败,结束程序')
exit(1)
# 断开连接
def close(self):
print('关闭'+self.ip)
self.chan.close()
self.trans.close()
# 发送要执行的命令
def send(self, cmds=''):
cmds += '\r'
res = ''
self.chan.send(cmds)
# 特别备注,这里需要判断一条语句已经执行通过,结尾来判断。
# 终端结尾最后是']$ ',注意后边还有一个空格
shellhead = ']$ '
try:
while True:
# 日志预留回显时间
time.sleep(1)
ret = self.chan.recv(65535).decode()
res += ret
# 判断的结尾,就停止循环
if ret.endswith(shellhead):
break
return res
except Exception as e:
# 日志预留
self.chan.close()
self.trans.close()
def is_alive(self):
print(self.chan.closed) # 是否关闭
return self.chan.active # 是否活动
# def upload(self,local_path,target_path):
# # 连接,上传
# # file_name = self.create_file()
# sftp = paramiko.SFTPClient.from_transport(self.t)
# # 将location.py 上传至服务器 /tmp/mytest.py
# sftp.put(local_path, target_path)
# def download(self,remote_path,local_path):
# sftp = paramiko.SFTPClient.from_transport(self.t)
# sftp.get(remote_path,local_path)
if __name__ == '__main__':
transid = "流水关键字"
path, cmd = 'cd /路径', f'grep {transid} *文件名*'
hosts = "主机ip"
host_username,host_password = "账号", "密码"
host = Linux(hosts, host_username,host_password)
host.connect()
try:
host.send(path)
interface_str = host.send(cmd)
host.close()
new_text = urllib.parse.unquote(interface_str) #用同一个字符换成%xx转义。相当于JS中的urldecode(),对url进行解码。
print(new_text)
print(interface_str)
except Exception as e:
host.close()
测试
- 有可能我们的日志是随机打印到不同的主机上的,所以可以host配置成列表的。
主机配置
4 添加一个线程类:
- 多个主机时候我们不会一个个线性去查询,所以我们可以创建一个线程类去多线程调用查询。
# -*- coding: utf-8 -*-
# @Time : 2020/3/9 23:48
# @Author : 怪盗LYL
# @File : testthreading.py
#!/usr/bin/python3
import concurrent
import threading
import time
from loguru import logger
from unit.paramiko_unit import Linux
class grepThread (threading.Thread):
def __init__(self,host, host_username, host_password,path,cmd):
threading.Thread.__init__(self)
self.host = host
self.host_username = host_username
self.host_password = host_password
self.path = path
self.cmd = cmd
def run(self):
logger.info("开始线程查询主机:" + self.host)
host = Linux(self.host, self.host_username, self.host_password)
host.connect()
try:
host.send(self.path)
interface_str = host.send(self.cmd)
host.close()
self.result = interface_str
except Exception as e:
logger.error(e.__str__())
host.close()
logger.info ("退出线程:" + self.host)
def get_result(self):
return self.result,self.host
def do_grep(arg):
linux = arg[0]
path = arg[1]
cmd = arg[2]
linux.connect()
try:
linux.send(path)
interface_str = linux.send(cmd)
linux.close()
result = interface_str
except Exception as e:
logger.error(e.__str__())
linux.close()
logger.info("退出线程:" + linux.ip)
return result
def grepfunction(hosts, host_username, host_password,path,cmd):
linuxs = []
for i in range(0,len(hosts)):
linux = Linux(hosts[i], host_username, host_password)
linuxs.append(linux)
with concurrent.futures.ThreadPoolExecutor(len(hosts)) as executor:
to_do = []
for i in range(0,len(linuxs)): # 模拟多个任务
future = executor.submit(do_grep,(linuxs[i],path,cmd))
to_do.append(future)
result_list = []
for future in concurrent.futures.as_completed(to_do): # 并发执行
result_list.append(future.result())
return result_list
if __name__ == '__main__':
pass
- 简单测试一下
try:
transid = "关键字"
path, cmd = 'cd /路径', f'grep {transid} 文件名'
thred_list = []
hosts = [主机列表]
for i in range(0, len(hosts)):
thread = grepThread(hosts[i], "账号", "密码", path, cmd)
thred_list.append(thread)
thread.start()
gateway_str = ""
geted_host = ""
for i in thred_list:
i.join()
# pattern = re.compile(r'\[[^\r\t\n]*?\]') # []包起来的
result, host = i.get_result()
# list_str = pattern.findall(result)
# for j in list_str:
if "ACTIONNAME" in result:
gateway_str = result
geted_host = host
logger.info("退出网关查询主线程")
print(gateway_str, geted_host)
except Exception as e:
# logger.exception(e)
logger.error(e)
print({"error_code": 9999})
测试
7 结语
- 这样使用Paramiko入门操作就介绍完了。大家可以根据自己实际情况在自己的项目里使用。
- 目前我是封装到了我的测试类里面,请求和返回之后会根据返回的报文解析关键字去主机查询日志,里面再根据json或者xml格式化输出。xml格式化使用ElementTree,json格式化使用json,读者可以自行尝试。
#root = ET.fromstring("报文")
#minidom.parseString(ET.tostring(et)).toprettyxml(indent=' ')
#json.dumps(ssprequest.json_response, indent=2, ensure_ascii=False)
- 下面这段代码可能用到:
TRANS_ID_str = TRANS_ID_str.strip()
TRANS_ID_str = re.sub(r'\\[.*?\[K', '', TRANS_ID_str)
- 好久没更新了,最近在摆烂。
推荐阅读
-
windows下进程间通信的(13种方法)-摘 要 本文讨论了进程间通信与应用程序间通信的含义及相应的实现技术,并对这些技术的原理、特性等进行了深入的分析和比较。 ---- 关键词 信号 管道 消息队列 共享存储段 信号灯 远程过程调用 Socket套接字 MQSeries 1 引言 ---- 进程间通信的主要目的是实现同一计算机系统内部的相互协作的进程之间的数据共享与信息交换,由于这些进程处于同一软件和硬件环境下,利用操作系统提供的的编程接口,用户可以方便地在程序中实现这种通信;应用程序间通信的主要目的是实现不同计算机系统中的相互协作的应用程序之间的数据共享与信息交换,由于应用程序分别运行在不同计算机系统中,它们之间要通过网络之间的协议才能实现数据共享与信息交换。进程间通信和应用程序间通信及相应的实现技术有许多相同之处,也各有自己的特色。即使是同一类型的通信也有多种的实现方法,以适应不同情况的需要。 ---- 为了充分认识和掌握这两种通信及相应的实现技术,本文将就以下几个方面对这两种通信进行深入的讨论:问题的由来、解决问题的策略和方法、每种方法的工作原理和实现、每种实现方法的特点和适用的范围等。 2 进程间的通信及其实现技术 ---- 用户提交给计算机的任务最终都是通过一个个的进程来完成的。在一组并发进程中的任何两个进程之间,如果都不存在公共变量,则称该组进程为不相交的。在不相交的进程组中,每个进程都独立于其它进程,它的运行环境与顺序程序一样,而且它的运行环境也不为别的进程所改变。运行的结果是确定的,不会发生与时间相关的错误。 ---- 但是,在实际中,并发进程的各个进程之间并不是完全互相独立的,它们之间往往存在着相互制约的关系。进程之间的相互制约关系表现为两种方式: ---- (1) 间接相互制约:共享CPU ---- (2) 直接相互制约:竞争和协作 ---- 竞争——进程对共享资源的竞争。为保证进程互斥地访问共享资源,各进程必须互斥地进入各自的临界段。 ---- 协作——进程之间交换数据。为完成一个共同任务而同时运行的一组进程称为同组进程,它们之间必须交换数据,以达到协作完成任务的目的,交换数据可以通知对方可以做某事或者委托对方做某事。 ---- 共享CPU问题由操作系统的进程调度来实现,进程间的竞争和协作由进程间的通信来完成。进程间的通信一般由操作系统提供编程接口,由程序员在程序中实现。UNIX在这个方面可以说最具特色,它提供了一整套进程间的数据共享与信息交换的处理方法——进程通信机制(IPC)。因此,我们就以UNIX为例来分析进程间通信的各种实现技术。 ---- 在UNIX中,文件(File)、信号(Signal)、无名管道(Unnamed Pipes)、有名管道(FIFOs)是传统IPC功能;新的IPC功能包括消息队列(Message queues)、共享存储段(Shared memory segment)和信号灯(Semapores)。 ---- (1) 信号 ---- 信号机制是UNIX为进程中断处理而设置的。它只是一组预定义的值,因此不能用于信息交换,仅用于进程中断控制。例如在发生浮点错、非法内存访问、执行无效指令、某些按键(如ctrl-c、del等)等都会产生一个信号,操作系统就会调用有关的系统调用或用户定义的处理过程来处理。 ---- 信号处理的系统调用是signal,调用形式是: ---- signal(signalno,action) ---- 其中,signalno是规定信号编号的值,action指明当特定的信号发生时所执行的动作。 ---- (2) 无名管道和有名管道 ---- 无名管道实际上是内存中的一个临时存储区,它由系统安全控制,并且独立于创建它的进程的内存区。管道对数据采用先进先出方式管理,并严格按顺序操作,例如不能对管道进行搜索,管道中的信息只能读一次。 ---- 无名管道只能用于两个相互协作的进程之间的通信,并且访问无名管道的进程必须有共同的祖先。 ---- 系统提供了许多标准管道库函数,如: pipe——打开一个可以读写的管道; close——关闭相应的管道; read——从管道中读取字符; write——向管道中写入字符; ---- 有名管道的操作和无名管道类似,不同的地方在于使用有名管道的进程不需要具有共同的祖先,其它进程,只要知道该管道的名字,就可以访问它。管道非常适合进程之间快速交换信息。 ---- (3) 消息队列(MQ) ---- 消息队列是内存中独立于生成它的进程的一段存储区,一旦创建消息队列,任何进程,只要具有正确的的访问权限,都可以访问消息队列,消息队列非常适合于在进程间交换短信息。 ---- 消息队列的每条消息由类型编号来分类,这样接收进程可以选择读取特定的消息类型——这一点与管道不同。消息队列在创建后将一直存在,直到使用msgctl系统调用或iqcrm -q命令删除它为止。 ---- 系统提供了许多有关创建、使用和管理消息队列的系统调用,如: ---- int msgget(key,flag)——创建一个具有flag权限的MQ及其相应的结构,并返回一个唯一的正整数msqid(MQ的标识符); ---- int msgsnd(msqid,msgp,msgsz,msgtyp,flag)——向队列中发送信息; ---- int msgrcv(msqid,cmd,buf)——从队列中接收信息; ---- int msgctl(msqid,cmd,buf)——对MQ的控制操作; ---- (4) 共享存储段(SM) ---- 共享存储段是主存的一部分,它由一个或多个独立的进程共享。各进程的数据段与共享存储段相关联,对每个进程来说,共享存储段有不同的虚拟地址。系统提供的有关SM的系统调用有: ---- int shmget(key,size,flag)——创建大小为size的SM段,其相应的数据结构名为key,并返回共享内存区的标识符shmid; ---- char shmat(shmid,address,flag)——将当前进程数据段的地址赋给shmget所返回的名为shmid的SM段; ---- int shmdr(address)——从进程地址空间删除SM段; ---- int shmctl (shmid,cmd,buf)——对SM的控制操作; ---- SM的大小只受主存限制,SM段的访问及进程间的信息交换可以通过同步读写来完成。同步通常由信号灯来实现。SM非常适合进程之间大量数据的共享。 ---- (5) 信号灯 ---- 在UNIX中,信号灯是一组进程共享的数据结构,当几个进程竞争同一资源时(文件、共享内存或消息队列等),它们的操作便由信号灯来同步,以防止互相干扰。 ---- 信号灯保证了某一时刻只有一个进程访问某一临界资源,所有请求该资源的其它进程都将被挂起,一旦该资源得到释放,系统才允许其它进程访问该资源。信号灯通常配对使用,以便实现资源的加锁和解锁。 ---- 进程间通信的实现技术的特点是:操作系统提供实现机制和编程接口,由用户在程序中实现,保证进程间可以进行快速的信息交换和大量数据的共享。但是,上述方式主要适合在同一台计算机系统内部的进程之间的通信。 3 应用程序间的通信及其实现技术 ---- 同进程之间的相互制约一样,不同的应用程序之间也存在竞争和协作的关系。UNIX操作系统也提供一些可用于应用程序之间实现数据共享与信息交换的编程接口,程序员可以通过自己编程来实现。如远程过程调用和基于TCP/IP协议的套接字(Socket)编程。但是,相对普通程序员来说,它们涉及的技术比较深,编程也比较复杂,实现起来困难较大。 ---- 于是,一种新的技术应运而生——通过将有关通信的细节完全掩盖在某个独立软件内部,即底层的通讯工作和相应的维护管理工作由该软件内部来实现,用户只需要将通信任务提交给该软件去完成,而不必理会它的具体工作过程——这就是所谓的中间件技术。 ---- 我们在这里分别讨论这三种常用的应用程序间通信的实现技术——远程过程调用、会话编程技术和MQSeries消息队列技术。其中远程过程调用和会话编程属于比较低级的方式,程序员参与的程度较深,而MQSeries消息队列则属于比较高级的方式,即中间件方式,程序员参与的程度较浅。 ---- 4.1 远程过程调用(RPC)
-
如何使用JS轻松实现天、时、分、秒的倒计时功能,还包括当天特定时间的倒计时操作指南
-
如何使用ChatGPT?简单教程 - 实现ChatGPT的全面操作指南
-
使用AJAX实现网页的URL查询参数操作指南
-
使用GUI-Guider创建并实现在ESP32-S3上的打印模板操作指南
-
如何用Python的Paramiko库实现SSH远程连接与SFTP文件传输操作
-
如何使用Python轻松实现连接SSH并从服务器下载日志文件的操作指南
-
在Python里,如何轻松运用Paramiko库进行远程服务器连接,并实现文件的上传与下载操作
-
使用Python的Paramiko库实现SSH连接操作指南
-
利用Python的Paramiko库实现在Linux远程登录与SFTP操作指南