Module netmiko.scp_handler
Netmiko SCP operations.
Supports file get and file put operations.
SCP requires a separate SSH connection for a control channel.
Currently only supports Cisco IOS and Cisco ASA.
Source code
"""
Netmiko SCP operations.
Supports file get and file put operations.
SCP requires a separate SSH connection for a control channel.
Currently only supports Cisco IOS and Cisco ASA.
"""
import re
import os
import hashlib
import scp
import platform
class SCPConn(object):
"""
Establish a secure copy channel to the remote network device.
Must close the SCP connection to get the file to write to the remote filesystem
"""
def __init__(self, ssh_conn, socket_timeout=10.0, progress=None, progress4=None):
self.ssh_ctl_chan = ssh_conn
self.socket_timeout = socket_timeout
self.progress = progress
self.progress4 = progress4
self.establish_scp_conn()
def establish_scp_conn(self):
"""Establish the secure copy connection."""
ssh_connect_params = self.ssh_ctl_chan._connect_params_dict()
self.scp_conn = self.ssh_ctl_chan._build_ssh_client()
self.scp_conn.connect(**ssh_connect_params)
self.scp_client = scp.SCPClient(
self.scp_conn.get_transport(),
socket_timeout=self.socket_timeout,
progress=self.progress,
progress4=self.progress4,
)
def scp_transfer_file(self, source_file, dest_file):
"""Put file using SCP (for backwards compatibility)."""
self.scp_client.put(source_file, dest_file)
def scp_get_file(self, source_file, dest_file):
"""Get file using SCP."""
platform = self.ssh_ctl_chan.device_type
if "cisco_ios" in platform or "cisco_xe" in platform:
try:
self.scp_client.get(source_file, dest_file)
except EOFError:
pass
else:
self.scp_client.get(source_file, dest_file)
def scp_put_file(self, source_file, dest_file):
"""Put file using SCP."""
self.scp_client.put(source_file, dest_file)
def close(self):
"""Close the SCP connection."""
self.scp_conn.close()
class BaseFileTransfer(object):
"""Class to manage SCP file transfer and associated SSH control channel."""
def __init__(
self,
ssh_conn,
source_file,
dest_file,
file_system=None,
direction="put",
socket_timeout=10.0,
progress=None,
progress4=None,
hash_supported=True,
):
self.ssh_ctl_chan = ssh_conn
self.source_file = source_file
self.dest_file = dest_file
self.direction = direction
self.socket_timeout = socket_timeout
self.progress = progress
self.progress4 = progress4
auto_flag = (
"cisco_ios" in ssh_conn.device_type
or "cisco_xe" in ssh_conn.device_type
or "cisco_xr" in ssh_conn.device_type
)
if not file_system:
if auto_flag:
self.file_system = self.ssh_ctl_chan._autodetect_fs()
else:
raise ValueError("Destination file system not specified")
else:
self.file_system = file_system
if direction == "put":
self.source_md5 = self.file_md5(source_file) if hash_supported else None
self.file_size = os.stat(source_file).st_size
elif direction == "get":
self.source_md5 = (
self.remote_md5(remote_file=source_file) if hash_supported else None
)
self.file_size = self.remote_file_size(remote_file=source_file)
else:
raise ValueError("Invalid direction specified")
def __enter__(self):
"""Context manager setup"""
self.establish_scp_conn()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Context manager cleanup."""
self.close_scp_chan()
def establish_scp_conn(self):
"""Establish SCP connection."""
self.scp_conn = SCPConn(
self.ssh_ctl_chan,
socket_timeout=self.socket_timeout,
progress=self.progress,
progress4=self.progress4,
)
def close_scp_chan(self):
"""Close the SCP connection to the remote network device."""
self.scp_conn.close()
self.scp_conn = None
def remote_space_available(self, search_pattern=r"(\d+) \w+ free"):
"""Return space available on remote device."""
remote_cmd = f"dir {self.file_system}"
remote_output = self.ssh_ctl_chan.send_command_expect(remote_cmd)
match = re.search(search_pattern, remote_output)
if "kbytes" in match.group(0) or "Kbytes" in match.group(0):
return int(match.group(1)) * 1000
return int(match.group(1))
def _remote_space_available_unix(self, search_pattern=""):
"""Return space available on *nix system (BSD/Linux)."""
self.ssh_ctl_chan._enter_shell()
remote_cmd = f"/bin/df -k {self.file_system}"
remote_output = self.ssh_ctl_chan.send_command(
remote_cmd, expect_string=r"[\$#]"
)
# Try to ensure parsing is correct:
# Filesystem 1K-blocks Used Avail Capacity Mounted on
# /dev/bo0s3f 1264808 16376 1147248 1% /cf/var
remote_output = remote_output.strip()
output_lines = remote_output.splitlines()
# First line is the header; second is the actual file system info
header_line = output_lines[0]
filesystem_line = output_lines[1]
if "Filesystem" not in header_line or "Avail" not in header_line.split()[3]:
# Filesystem 1K-blocks Used Avail Capacity Mounted on
msg = "Parsing error, unexpected output from {}:\n{}".format(
remote_cmd, remote_output
)
raise ValueError(msg)
space_available = filesystem_line.split()[3]
if not re.search(r"^\d+$", space_available):
msg = "Parsing error, unexpected output from {}:\n{}".format(
remote_cmd, remote_output
)
raise ValueError(msg)
self.ssh_ctl_chan._return_cli()
return int(space_available) * 1024
def local_space_available(self):
"""Return space available on local filesystem."""
if platform.system() == "Windows":
import ctypes
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p("."), None, None, ctypes.pointer(free_bytes)
)
return free_bytes.value
else:
destination_stats = os.statvfs(".")
return destination_stats.f_bsize * destination_stats.f_bavail
def verify_space_available(self, search_pattern=r"(\d+) \w+ free"):
"""Verify sufficient space is available on destination file system (return boolean)."""
if self.direction == "put":
space_avail = self.remote_space_available(search_pattern=search_pattern)
elif self.direction == "get":
space_avail = self.local_space_available()
if space_avail > self.file_size:
return True
return False
def check_file_exists(self, remote_cmd=""):
"""Check if the dest_file already exists on the file system (return boolean)."""
if self.direction == "put":
if not remote_cmd:
remote_cmd = f"dir {self.file_system}/{self.dest_file}"
remote_out = self.ssh_ctl_chan.send_command_expect(remote_cmd)
search_string = r"Directory of .*{0}".format(self.dest_file)
if (
"Error opening" in remote_out
or "No such file or directory" in remote_out
or "Path does not exist" in remote_out
):
return False
elif re.search(search_string, remote_out, flags=re.DOTALL):
return True
else:
raise ValueError("Unexpected output from check_file_exists")
elif self.direction == "get":
return os.path.exists(self.dest_file)
def _check_file_exists_unix(self, remote_cmd=""):
"""Check if the dest_file already exists on the file system (return boolean)."""
if self.direction == "put":
self.ssh_ctl_chan._enter_shell()
remote_cmd = f"ls {self.file_system}"
remote_out = self.ssh_ctl_chan.send_command(
remote_cmd, expect_string=r"[\$#]"
)
self.ssh_ctl_chan._return_cli()
return self.dest_file in remote_out
elif self.direction == "get":
return os.path.exists(self.dest_file)
def remote_file_size(self, remote_cmd="", remote_file=None):
"""Get the file size of the remote file."""
if remote_file is None:
if self.direction == "put":
remote_file = self.dest_file
elif self.direction == "get":
remote_file = self.source_file
if not remote_cmd:
remote_cmd = f"dir {self.file_system}/{remote_file}"
remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
# Strip out "Directory of flash:/filename line
remote_out = re.split(r"Directory of .*", remote_out)
remote_out = "".join(remote_out)
# Match line containing file name
escape_file_name = re.escape(remote_file)
pattern = r".*({}).*".format(escape_file_name)
match = re.search(pattern, remote_out)
if match:
line = match.group(0)
# Format will be 26 -rw- 6738 Jul 30 2016 19:49:50 -07:00 filename
file_size = line.split()[2]
else:
raise IOError("Unable to parse 'dir' output in remote_file_size method")
if "Error opening" in remote_out or "No such file or directory" in remote_out:
raise IOError("Unable to find file on remote system")
else:
return int(file_size)
def _remote_file_size_unix(self, remote_cmd="", remote_file=None):
"""Get the file size of the remote file."""
if remote_file is None:
if self.direction == "put":
remote_file = self.dest_file
elif self.direction == "get":
remote_file = self.source_file
remote_file = f"{self.file_system}/{remote_file}"
if not remote_cmd:
remote_cmd = f"ls -l {remote_file}"
self.ssh_ctl_chan._enter_shell()
remote_out = self.ssh_ctl_chan.send_command(remote_cmd, expect_string=r"[\$#]")
self.ssh_ctl_chan._return_cli()
if "No such file or directory" in remote_out:
raise IOError("Unable to find file on remote system")
escape_file_name = re.escape(remote_file)
pattern = r"^.* ({}).*$".format(escape_file_name)
match = re.search(pattern, remote_out, flags=re.M)
if match:
# Format: -rw-r--r-- 1 pyclass wheel 12 Nov 5 19:07 /var/tmp/test3.txt
line = match.group(0)
file_size = line.split()[4]
return int(file_size)
raise ValueError(
"Search pattern not found for remote file size during SCP transfer."
)
def file_md5(self, file_name, add_newline=False):
"""Compute MD5 hash of file.
add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in
the string
Args:
file_name: name of file to get md5 digest of
add_newline: add newline to end of file contents or not
"""
file_hash = hashlib.md5()
with open(file_name, "rb") as f:
while True:
file_contents = f.read(512)
if not file_contents:
if add_newline:
file_contents + b"\n"
break
file_hash.update(file_contents)
return file_hash.hexdigest()
@staticmethod
def process_md5(md5_output, pattern=r"=\s+(\S+)"):
"""
Process the string to retrieve the MD5 hash
Output from Cisco IOS (ASA is similar)
.MD5 of flash:file_name Done!
verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2
"""
match = re.search(pattern, md5_output)
if match:
return match.group(1)
else:
raise ValueError(f"Invalid output from MD5 command: {md5_output}")
def compare_md5(self):
"""Compare md5 of file on network device to md5 of local file."""
if self.direction == "put":
remote_md5 = self.remote_md5()
return self.source_md5 == remote_md5
elif self.direction == "get":
local_md5 = self.file_md5(self.dest_file)
return self.source_md5 == local_md5
def remote_md5(self, base_cmd="verify /md5", remote_file=None):
"""Calculate remote MD5 and returns the hash.
This command can be CPU intensive on the remote device.
"""
if remote_file is None:
if self.direction == "put":
remote_file = self.dest_file
elif self.direction == "get":
remote_file = self.source_file
remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}"
dest_md5 = self.ssh_ctl_chan.send_command(remote_md5_cmd, max_loops=1500)
dest_md5 = self.process_md5(dest_md5)
return dest_md5
def transfer_file(self):
"""SCP transfer file."""
if self.direction == "put":
self.put_file()
elif self.direction == "get":
self.get_file()
def get_file(self):
"""SCP copy the file from the remote device to local system."""
source_file = f"{self.file_system}/{self.source_file}"
self.scp_conn.scp_get_file(source_file, self.dest_file)
self.scp_conn.close()
def put_file(self):
"""SCP copy the file from the local system to the remote device."""
destination = f"{self.file_system}/{self.dest_file}"
self.scp_conn.scp_transfer_file(self.source_file, destination)
# Must close the SCP connection to get the file written (flush)
self.scp_conn.close()
def verify_file(self):
"""Verify the file has been transferred correctly."""
return self.compare_md5()
def enable_scp(self, cmd=None):
"""
Enable SCP on remote device.
Defaults to Cisco IOS command
"""
if cmd is None:
cmd = ["ip scp server enable"]
elif not hasattr(cmd, "__iter__"):
cmd = [cmd]
self.ssh_ctl_chan.send_config_set(cmd)
def disable_scp(self, cmd=None):
"""
Disable SCP on remote device.
Defaults to Cisco IOS command
"""
if cmd is None:
cmd = ["no ip scp server enable"]
elif not hasattr(cmd, "__iter__"):
cmd = [cmd]
self.ssh_ctl_chan.send_config_set(cmd)
Classes
class BaseFileTransfer (ssh_conn, source_file, dest_file, file_system=None, direction='put', socket_timeout=10.0, progress=None, progress4=None, hash_supported=True)
-
Class to manage SCP file transfer and associated SSH control channel.
Source code
class BaseFileTransfer(object): """Class to manage SCP file transfer and associated SSH control channel.""" def __init__( self, ssh_conn, source_file, dest_file, file_system=None, direction="put", socket_timeout=10.0, progress=None, progress4=None, hash_supported=True, ): self.ssh_ctl_chan = ssh_conn self.source_file = source_file self.dest_file = dest_file self.direction = direction self.socket_timeout = socket_timeout self.progress = progress self.progress4 = progress4 auto_flag = ( "cisco_ios" in ssh_conn.device_type or "cisco_xe" in ssh_conn.device_type or "cisco_xr" in ssh_conn.device_type ) if not file_system: if auto_flag: self.file_system = self.ssh_ctl_chan._autodetect_fs() else: raise ValueError("Destination file system not specified") else: self.file_system = file_system if direction == "put": self.source_md5 = self.file_md5(source_file) if hash_supported else None self.file_size = os.stat(source_file).st_size elif direction == "get": self.source_md5 = ( self.remote_md5(remote_file=source_file) if hash_supported else None ) self.file_size = self.remote_file_size(remote_file=source_file) else: raise ValueError("Invalid direction specified") def __enter__(self): """Context manager setup""" self.establish_scp_conn() return self def __exit__(self, exc_type, exc_value, traceback): """Context manager cleanup.""" self.close_scp_chan() def establish_scp_conn(self): """Establish SCP connection.""" self.scp_conn = SCPConn( self.ssh_ctl_chan, socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, ) def close_scp_chan(self): """Close the SCP connection to the remote network device.""" self.scp_conn.close() self.scp_conn = None def remote_space_available(self, search_pattern=r"(\d+) \w+ free"): """Return space available on remote device.""" remote_cmd = f"dir {self.file_system}" remote_output = self.ssh_ctl_chan.send_command_expect(remote_cmd) match = re.search(search_pattern, remote_output) if "kbytes" in match.group(0) or "Kbytes" in match.group(0): return int(match.group(1)) * 1000 return int(match.group(1)) def _remote_space_available_unix(self, search_pattern=""): """Return space available on *nix system (BSD/Linux).""" self.ssh_ctl_chan._enter_shell() remote_cmd = f"/bin/df -k {self.file_system}" remote_output = self.ssh_ctl_chan.send_command( remote_cmd, expect_string=r"[\$#]" ) # Try to ensure parsing is correct: # Filesystem 1K-blocks Used Avail Capacity Mounted on # /dev/bo0s3f 1264808 16376 1147248 1% /cf/var remote_output = remote_output.strip() output_lines = remote_output.splitlines() # First line is the header; second is the actual file system info header_line = output_lines[0] filesystem_line = output_lines[1] if "Filesystem" not in header_line or "Avail" not in header_line.split()[3]: # Filesystem 1K-blocks Used Avail Capacity Mounted on msg = "Parsing error, unexpected output from {}:\n{}".format( remote_cmd, remote_output ) raise ValueError(msg) space_available = filesystem_line.split()[3] if not re.search(r"^\d+$", space_available): msg = "Parsing error, unexpected output from {}:\n{}".format( remote_cmd, remote_output ) raise ValueError(msg) self.ssh_ctl_chan._return_cli() return int(space_available) * 1024 def local_space_available(self): """Return space available on local filesystem.""" if platform.system() == "Windows": import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p("."), None, None, ctypes.pointer(free_bytes) ) return free_bytes.value else: destination_stats = os.statvfs(".") return destination_stats.f_bsize * destination_stats.f_bavail def verify_space_available(self, search_pattern=r"(\d+) \w+ free"): """Verify sufficient space is available on destination file system (return boolean).""" if self.direction == "put": space_avail = self.remote_space_available(search_pattern=search_pattern) elif self.direction == "get": space_avail = self.local_space_available() if space_avail > self.file_size: return True return False def check_file_exists(self, remote_cmd=""): """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": if not remote_cmd: remote_cmd = f"dir {self.file_system}/{self.dest_file}" remote_out = self.ssh_ctl_chan.send_command_expect(remote_cmd) search_string = r"Directory of .*{0}".format(self.dest_file) if ( "Error opening" in remote_out or "No such file or directory" in remote_out or "Path does not exist" in remote_out ): return False elif re.search(search_string, remote_out, flags=re.DOTALL): return True else: raise ValueError("Unexpected output from check_file_exists") elif self.direction == "get": return os.path.exists(self.dest_file) def _check_file_exists_unix(self, remote_cmd=""): """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": self.ssh_ctl_chan._enter_shell() remote_cmd = f"ls {self.file_system}" remote_out = self.ssh_ctl_chan.send_command( remote_cmd, expect_string=r"[\$#]" ) self.ssh_ctl_chan._return_cli() return self.dest_file in remote_out elif self.direction == "get": return os.path.exists(self.dest_file) def remote_file_size(self, remote_cmd="", remote_file=None): """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file if not remote_cmd: remote_cmd = f"dir {self.file_system}/{remote_file}" remote_out = self.ssh_ctl_chan.send_command(remote_cmd) # Strip out "Directory of flash:/filename line remote_out = re.split(r"Directory of .*", remote_out) remote_out = "".join(remote_out) # Match line containing file name escape_file_name = re.escape(remote_file) pattern = r".*({}).*".format(escape_file_name) match = re.search(pattern, remote_out) if match: line = match.group(0) # Format will be 26 -rw- 6738 Jul 30 2016 19:49:50 -07:00 filename file_size = line.split()[2] else: raise IOError("Unable to parse 'dir' output in remote_file_size method") if "Error opening" in remote_out or "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") else: return int(file_size) def _remote_file_size_unix(self, remote_cmd="", remote_file=None): """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_file = f"{self.file_system}/{remote_file}" if not remote_cmd: remote_cmd = f"ls -l {remote_file}" self.ssh_ctl_chan._enter_shell() remote_out = self.ssh_ctl_chan.send_command(remote_cmd, expect_string=r"[\$#]") self.ssh_ctl_chan._return_cli() if "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") escape_file_name = re.escape(remote_file) pattern = r"^.* ({}).*$".format(escape_file_name) match = re.search(pattern, remote_out, flags=re.M) if match: # Format: -rw-r--r-- 1 pyclass wheel 12 Nov 5 19:07 /var/tmp/test3.txt line = match.group(0) file_size = line.split()[4] return int(file_size) raise ValueError( "Search pattern not found for remote file size during SCP transfer." ) def file_md5(self, file_name, add_newline=False): """Compute MD5 hash of file. add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string Args: file_name: name of file to get md5 digest of add_newline: add newline to end of file contents or not """ file_hash = hashlib.md5() with open(file_name, "rb") as f: while True: file_contents = f.read(512) if not file_contents: if add_newline: file_contents + b"\n" break file_hash.update(file_contents) return file_hash.hexdigest() @staticmethod def process_md5(md5_output, pattern=r"=\s+(\S+)"): """ Process the string to retrieve the MD5 hash Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2 """ match = re.search(pattern, md5_output) if match: return match.group(1) else: raise ValueError(f"Invalid output from MD5 command: {md5_output}") def compare_md5(self): """Compare md5 of file on network device to md5 of local file.""" if self.direction == "put": remote_md5 = self.remote_md5() return self.source_md5 == remote_md5 elif self.direction == "get": local_md5 = self.file_md5(self.dest_file) return self.source_md5 == local_md5 def remote_md5(self, base_cmd="verify /md5", remote_file=None): """Calculate remote MD5 and returns the hash. This command can be CPU intensive on the remote device. """ if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}" dest_md5 = self.ssh_ctl_chan.send_command(remote_md5_cmd, max_loops=1500) dest_md5 = self.process_md5(dest_md5) return dest_md5 def transfer_file(self): """SCP transfer file.""" if self.direction == "put": self.put_file() elif self.direction == "get": self.get_file() def get_file(self): """SCP copy the file from the remote device to local system.""" source_file = f"{self.file_system}/{self.source_file}" self.scp_conn.scp_get_file(source_file, self.dest_file) self.scp_conn.close() def put_file(self): """SCP copy the file from the local system to the remote device.""" destination = f"{self.file_system}/{self.dest_file}" self.scp_conn.scp_transfer_file(self.source_file, destination) # Must close the SCP connection to get the file written (flush) self.scp_conn.close() def verify_file(self): """Verify the file has been transferred correctly.""" return self.compare_md5() def enable_scp(self, cmd=None): """ Enable SCP on remote device. Defaults to Cisco IOS command """ if cmd is None: cmd = ["ip scp server enable"] elif not hasattr(cmd, "__iter__"): cmd = [cmd] self.ssh_ctl_chan.send_config_set(cmd) def disable_scp(self, cmd=None): """ Disable SCP on remote device. Defaults to Cisco IOS command """ if cmd is None: cmd = ["no ip scp server enable"] elif not hasattr(cmd, "__iter__"): cmd = [cmd] self.ssh_ctl_chan.send_config_set(cmd)
Subclasses
- CiscoFileTransfer
- CienaSaosFileTransfer
- DellOS10FileTransfer
- JuniperFileTransfer
- NokiaSrosFileTransfer
Static methods
def process_md5(md5_output, pattern='=\\s+(\\S+)')
-
Process the string to retrieve the MD5 hash
Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2
Source code
@staticmethod def process_md5(md5_output, pattern=r"=\s+(\S+)"): """ Process the string to retrieve the MD5 hash Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2 """ match = re.search(pattern, md5_output) if match: return match.group(1) else: raise ValueError(f"Invalid output from MD5 command: {md5_output}")
Methods
def check_file_exists(self, remote_cmd='')
-
Check if the dest_file already exists on the file system (return boolean).
Source code
def check_file_exists(self, remote_cmd=""): """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": if not remote_cmd: remote_cmd = f"dir {self.file_system}/{self.dest_file}" remote_out = self.ssh_ctl_chan.send_command_expect(remote_cmd) search_string = r"Directory of .*{0}".format(self.dest_file) if ( "Error opening" in remote_out or "No such file or directory" in remote_out or "Path does not exist" in remote_out ): return False elif re.search(search_string, remote_out, flags=re.DOTALL): return True else: raise ValueError("Unexpected output from check_file_exists") elif self.direction == "get": return os.path.exists(self.dest_file)
def close_scp_chan(self)
-
Close the SCP connection to the remote network device.
Source code
def close_scp_chan(self): """Close the SCP connection to the remote network device.""" self.scp_conn.close() self.scp_conn = None
def compare_md5(self)
-
Compare md5 of file on network device to md5 of local file.
Source code
def compare_md5(self): """Compare md5 of file on network device to md5 of local file.""" if self.direction == "put": remote_md5 = self.remote_md5() return self.source_md5 == remote_md5 elif self.direction == "get": local_md5 = self.file_md5(self.dest_file) return self.source_md5 == local_md5
def disable_scp(self, cmd=None)
-
Disable SCP on remote device.
Defaults to Cisco IOS command
Source code
def disable_scp(self, cmd=None): """ Disable SCP on remote device. Defaults to Cisco IOS command """ if cmd is None: cmd = ["no ip scp server enable"] elif not hasattr(cmd, "__iter__"): cmd = [cmd] self.ssh_ctl_chan.send_config_set(cmd)
def enable_scp(self, cmd=None)
-
Enable SCP on remote device.
Defaults to Cisco IOS command
Source code
def enable_scp(self, cmd=None): """ Enable SCP on remote device. Defaults to Cisco IOS command """ if cmd is None: cmd = ["ip scp server enable"] elif not hasattr(cmd, "__iter__"): cmd = [cmd] self.ssh_ctl_chan.send_config_set(cmd)
def establish_scp_conn(self)
-
Establish SCP connection.
Source code
def establish_scp_conn(self): """Establish SCP connection.""" self.scp_conn = SCPConn( self.ssh_ctl_chan, socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, )
def file_md5(self, file_name, add_newline=False)
-
Compute MD5 hash of file.
add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string
Args
file_name
- name of file to get md5 digest of
add_newline
- add newline to end of file contents or not
Source code
def file_md5(self, file_name, add_newline=False): """Compute MD5 hash of file. add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string Args: file_name: name of file to get md5 digest of add_newline: add newline to end of file contents or not """ file_hash = hashlib.md5() with open(file_name, "rb") as f: while True: file_contents = f.read(512) if not file_contents: if add_newline: file_contents + b"\n" break file_hash.update(file_contents) return file_hash.hexdigest()
def get_file(self)
-
SCP copy the file from the remote device to local system.
Source code
def get_file(self): """SCP copy the file from the remote device to local system.""" source_file = f"{self.file_system}/{self.source_file}" self.scp_conn.scp_get_file(source_file, self.dest_file) self.scp_conn.close()
def local_space_available(self)
-
Return space available on local filesystem.
Source code
def local_space_available(self): """Return space available on local filesystem.""" if platform.system() == "Windows": import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p("."), None, None, ctypes.pointer(free_bytes) ) return free_bytes.value else: destination_stats = os.statvfs(".") return destination_stats.f_bsize * destination_stats.f_bavail
def put_file(self)
-
SCP copy the file from the local system to the remote device.
Source code
def put_file(self): """SCP copy the file from the local system to the remote device.""" destination = f"{self.file_system}/{self.dest_file}" self.scp_conn.scp_transfer_file(self.source_file, destination) # Must close the SCP connection to get the file written (flush) self.scp_conn.close()
def remote_file_size(self, remote_cmd='', remote_file=None)
-
Get the file size of the remote file.
Source code
def remote_file_size(self, remote_cmd="", remote_file=None): """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file if not remote_cmd: remote_cmd = f"dir {self.file_system}/{remote_file}" remote_out = self.ssh_ctl_chan.send_command(remote_cmd) # Strip out "Directory of flash:/filename line remote_out = re.split(r"Directory of .*", remote_out) remote_out = "".join(remote_out) # Match line containing file name escape_file_name = re.escape(remote_file) pattern = r".*({}).*".format(escape_file_name) match = re.search(pattern, remote_out) if match: line = match.group(0) # Format will be 26 -rw- 6738 Jul 30 2016 19:49:50 -07:00 filename file_size = line.split()[2] else: raise IOError("Unable to parse 'dir' output in remote_file_size method") if "Error opening" in remote_out or "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") else: return int(file_size)
def remote_md5(self, base_cmd='verify /md5', remote_file=None)
-
Calculate remote MD5 and returns the hash.
This command can be CPU intensive on the remote device.
Source code
def remote_md5(self, base_cmd="verify /md5", remote_file=None): """Calculate remote MD5 and returns the hash. This command can be CPU intensive on the remote device. """ if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}" dest_md5 = self.ssh_ctl_chan.send_command(remote_md5_cmd, max_loops=1500) dest_md5 = self.process_md5(dest_md5) return dest_md5
def remote_space_available(self, search_pattern='(\\d+) \\w+ free')
-
Return space available on remote device.
Source code
def remote_space_available(self, search_pattern=r"(\d+) \w+ free"): """Return space available on remote device.""" remote_cmd = f"dir {self.file_system}" remote_output = self.ssh_ctl_chan.send_command_expect(remote_cmd) match = re.search(search_pattern, remote_output) if "kbytes" in match.group(0) or "Kbytes" in match.group(0): return int(match.group(1)) * 1000 return int(match.group(1))
def transfer_file(self)
-
SCP transfer file.
Source code
def transfer_file(self): """SCP transfer file.""" if self.direction == "put": self.put_file() elif self.direction == "get": self.get_file()
def verify_file(self)
-
Verify the file has been transferred correctly.
Source code
def verify_file(self): """Verify the file has been transferred correctly.""" return self.compare_md5()
def verify_space_available(self, search_pattern='(\\d+) \\w+ free')
-
Verify sufficient space is available on destination file system (return boolean).
Source code
def verify_space_available(self, search_pattern=r"(\d+) \w+ free"): """Verify sufficient space is available on destination file system (return boolean).""" if self.direction == "put": space_avail = self.remote_space_available(search_pattern=search_pattern) elif self.direction == "get": space_avail = self.local_space_available() if space_avail > self.file_size: return True return False
class SCPConn (ssh_conn, socket_timeout=10.0, progress=None, progress4=None)
-
Establish a secure copy channel to the remote network device.
Must close the SCP connection to get the file to write to the remote filesystem
Source code
class SCPConn(object): """ Establish a secure copy channel to the remote network device. Must close the SCP connection to get the file to write to the remote filesystem """ def __init__(self, ssh_conn, socket_timeout=10.0, progress=None, progress4=None): self.ssh_ctl_chan = ssh_conn self.socket_timeout = socket_timeout self.progress = progress self.progress4 = progress4 self.establish_scp_conn() def establish_scp_conn(self): """Establish the secure copy connection.""" ssh_connect_params = self.ssh_ctl_chan._connect_params_dict() self.scp_conn = self.ssh_ctl_chan._build_ssh_client() self.scp_conn.connect(**ssh_connect_params) self.scp_client = scp.SCPClient( self.scp_conn.get_transport(), socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, ) def scp_transfer_file(self, source_file, dest_file): """Put file using SCP (for backwards compatibility).""" self.scp_client.put(source_file, dest_file) def scp_get_file(self, source_file, dest_file): """Get file using SCP.""" platform = self.ssh_ctl_chan.device_type if "cisco_ios" in platform or "cisco_xe" in platform: try: self.scp_client.get(source_file, dest_file) except EOFError: pass else: self.scp_client.get(source_file, dest_file) def scp_put_file(self, source_file, dest_file): """Put file using SCP.""" self.scp_client.put(source_file, dest_file) def close(self): """Close the SCP connection.""" self.scp_conn.close()
Methods
def close(self)
-
Close the SCP connection.
Source code
def close(self): """Close the SCP connection.""" self.scp_conn.close()
def establish_scp_conn(self)
-
Establish the secure copy connection.
Source code
def establish_scp_conn(self): """Establish the secure copy connection.""" ssh_connect_params = self.ssh_ctl_chan._connect_params_dict() self.scp_conn = self.ssh_ctl_chan._build_ssh_client() self.scp_conn.connect(**ssh_connect_params) self.scp_client = scp.SCPClient( self.scp_conn.get_transport(), socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, )
def scp_get_file(self, source_file, dest_file)
-
Get file using SCP.
Source code
def scp_get_file(self, source_file, dest_file): """Get file using SCP.""" platform = self.ssh_ctl_chan.device_type if "cisco_ios" in platform or "cisco_xe" in platform: try: self.scp_client.get(source_file, dest_file) except EOFError: pass else: self.scp_client.get(source_file, dest_file)
def scp_put_file(self, source_file, dest_file)
-
Put file using SCP.
Source code
def scp_put_file(self, source_file, dest_file): """Put file using SCP.""" self.scp_client.put(source_file, dest_file)
def scp_transfer_file(self, source_file, dest_file)
-
Put file using SCP (for backwards compatibility).
Source code
def scp_transfer_file(self, source_file, dest_file): """Put file using SCP (for backwards compatibility).""" self.scp_client.put(source_file, dest_file)