Module scrapli.channel.async_channel¶
scrapli.channel.async_channel
Expand source code
"""scrapli.channel.async_channel"""
import asyncio
import re
import time
from io import SEEK_END, BytesIO
try:
from contextlib import asynccontextmanager
except ImportError: # pragma: nocover
# needed for 3.6 support, no asynccontextmanager until 3.7
from async_generator import asynccontextmanager # type: ignore # pragma: nocover
from datetime import datetime
from typing import AsyncIterator, List, Optional, Tuple
from scrapli.channel.base_channel import BaseChannel, BaseChannelArgs
from scrapli.decorators import ChannelTimeout
from scrapli.exceptions import ScrapliAuthenticationFailed, ScrapliTimeout
from scrapli.transport.base import AsyncTransport
class AsyncChannel(BaseChannel):
def __init__(
self,
transport: AsyncTransport,
base_channel_args: BaseChannelArgs,
) -> None:
super().__init__(
transport=transport,
base_channel_args=base_channel_args,
)
self.transport: AsyncTransport
self.channel_lock: Optional[asyncio.Lock] = None
if self._base_channel_args.channel_lock:
self.channel_lock = asyncio.Lock()
@asynccontextmanager
async def _channel_lock(self) -> AsyncIterator[None]:
"""
Lock the channel during public channel operations if channel_lock is enabled
Args:
N/A
Yields:
None
Raises:
N/A
"""
if self.channel_lock:
async with self.channel_lock:
yield
else:
yield
async def read(self) -> bytes:
r"""
Read chunks of output from the channel
Replaces any \r characters that sometimes get stuffed into the output from the devices
Args:
N/A
Returns:
bytes: output read from channel
Raises:
N/A
"""
buf = await self.transport.read()
buf = buf.replace(b"\r", b"")
self.logger.debug(f"read: {repr(buf)}")
if self.channel_log:
self.channel_log.write(buf)
if b"\x1b" in buf.lower():
buf = self._strip_ansi(buf=buf)
return buf
async def _read_until_input(self, channel_input: bytes) -> bytes:
"""
Read until all channel_input has been read on the channel
Args:
channel_input: bytes that should have been written to the channel
Returns:
bytes: output read from channel while checking for the input in the channel stream
Raises:
N/A
"""
buf = b""
if not channel_input:
return buf
# squish all channel input words together and cast to lower to make comparison easier
processed_channel_input = b"".join(channel_input.lower().split())
while True:
buf += await self.read()
# replace any backspace chars (particular problem w/ junos), and remove any added spaces
# this is just for comparison of the inputs to what was read from channel
if processed_channel_input in b"".join(buf.lower().replace(b"\x08", b"").split()):
return buf
async def _read_until_prompt(self, buf: bytes = b"") -> bytes:
"""
Read until expected prompt is seen.
This reads until the "normal" `_base_channel_args.comms_prompt_pattern` is seen. The
`_read_until_explicit_prompt` method can be used to read until some pattern in an arbitrary
list of patterns is seen.
Args:
buf: output from previous reads if needed (used by scrapli netconf)
Returns:
bytes: output read from channel
Raises:
N/A
"""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern
)
read_buf = BytesIO(buf)
while True:
b = await self.read()
read_buf.write(b)
read_buf.seek(-self._base_channel_args.comms_prompt_search_depth, SEEK_END)
search_buf = read_buf.read()
channel_match = re.search(
pattern=search_pattern,
string=search_buf,
)
if channel_match:
return read_buf.getvalue()
async def _read_until_explicit_prompt(self, prompts: List[str]) -> bytes:
"""
Read until expected prompt is seen.
This method is for *explicit* prompt patterns instead of the "standard" prompt patterns
contained in the `_base_channel_args.comms_prompt_pattern` attribute. Generally this is
only used for `send_interactive` though it could be used elsewhere as well.
Args:
prompts: list of prompt patterns to look for, will return upon seeing any match
Returns:
bytes: output read from channel
Raises:
N/A
"""
search_patterns = [
self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern, pattern=prompt
)
for prompt in prompts
]
read_buf = BytesIO(b"")
while True:
b = await self.read()
read_buf.write(b)
read_buf.seek(-self._base_channel_args.comms_prompt_search_depth, SEEK_END)
search_buf = read_buf.read()
for search_pattern in search_patterns:
channel_match = re.search(
pattern=search_pattern,
string=search_buf,
)
if channel_match:
return read_buf.getvalue()
async def _read_until_prompt_or_time(
self,
buf: bytes = b"",
channel_outputs: Optional[List[bytes]] = None,
read_duration: Optional[float] = None,
) -> bytes:
"""
Read until expected prompt is seen, outputs are seen, or for duration, whichever comes first
As transport reading may block, transport timeout is temporarily set to the read_duration
and any `ScrapliTimeout` that is raised while reading is ignored.
Args:
buf: bytes from previous reads if needed
channel_outputs: List of bytes to search for in channel output, if any are seen, return
read output
read_duration: duration to read from channel for
Returns:
bytes: output read from channel
Raises:
N/A
"""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern,
)
if channel_outputs is None:
channel_outputs = []
if read_duration is None:
read_duration = 2.5
regex_channel_outputs_pattern = self._join_and_compile(channel_outputs=channel_outputs)
_transport_args = self.transport._base_transport_args # pylint: disable=W0212
previous_timeout_transport = _transport_args.timeout_transport
_transport_args.timeout_transport = int(read_duration)
read_buf = BytesIO(buf)
start = time.time()
while True:
try:
b = await self.read()
read_buf.write(b)
except ScrapliTimeout:
pass
read_buf.seek(-self._base_channel_args.comms_prompt_search_depth, SEEK_END)
search_buf = read_buf.read()
if (time.time() - start) > read_duration:
break
if any((channel_output in search_buf for channel_output in channel_outputs)):
break
if re.search(pattern=regex_channel_outputs_pattern, string=search_buf):
break
if re.search(pattern=search_pattern, string=search_buf):
break
_transport_args.timeout_transport = previous_timeout_transport
return read_buf.getvalue()
@ChannelTimeout(message="timed out during in channel ssh authentication")
async def channel_authenticate_ssh(
self, auth_password: str, auth_private_key_passphrase: str
) -> None:
"""
Handle SSH Authentication for transports that only operate "in the channel" (i.e. system)
Args:
auth_password: password to authenticate with
auth_private_key_passphrase: passphrase for ssh key if necessary
Returns:
None
Raises:
ScrapliAuthenticationFailed: if password prompt seen more than twice
ScrapliAuthenticationFailed: if passphrase prompt seen more than twice
"""
self.logger.debug("attempting in channel ssh authentication")
password_count = 0
passphrase_count = 0
authenticate_buf = b""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern
)
async with self._channel_lock():
while True:
try:
buf = await asyncio.wait_for(self.read(), timeout=1)
except asyncio.TimeoutError:
buf = b""
authenticate_buf += buf.lower()
if b"password" in authenticate_buf:
# clear the authentication buffer so we don't re-read the password prompt
authenticate_buf = b""
password_count += 1
if password_count > 2:
msg = "password prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_password, redacted=True)
self.send_return()
if b"enter passphrase for key" in authenticate_buf:
# clear the authentication buffer so we don't re-read the passphrase prompt
authenticate_buf = b""
passphrase_count += 1
if passphrase_count > 2:
msg = "passphrase prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_private_key_passphrase, redacted=True)
self.send_return()
channel_match = re.search(
pattern=search_pattern,
string=authenticate_buf,
)
if channel_match:
return
@ChannelTimeout(message="timed out during in channel telnet authentication")
async def channel_authenticate_telnet( # noqa: C901
self, auth_username: str = "", auth_password: str = ""
) -> None:
"""
Handle Telnet Authentication
Args:
auth_username: username to use for telnet authentication
auth_password: password to use for telnet authentication
Returns:
None
Raises:
ScrapliAuthenticationFailed: if password prompt seen more than twice
ScrapliAuthenticationFailed: if login prompt seen more than twice
"""
self.logger.debug("attempting in channel telnet authentication")
username_count = 0
password_count = 0
authenticate_buf = b""
(
username_pattern,
password_pattern,
prompt_pattern,
auth_start_time,
return_interval,
) = self._pre_channel_authenticate_telnet()
read_interval = self._base_channel_args.timeout_ops / 20
return_attempts = 1
async with self._channel_lock():
while True:
try:
buf = await asyncio.wait_for(self.read(), timeout=read_interval)
except asyncio.TimeoutError:
buf = b""
if not buf:
current_iteration_time = datetime.now().timestamp()
if (current_iteration_time - auth_start_time) > (
return_interval * return_attempts
):
self.send_return()
return_attempts += 1
authenticate_buf += buf.lower()
if re.search(
pattern=username_pattern,
string=authenticate_buf,
):
# clear the authentication buffer so we don't re-read the username prompt
authenticate_buf = b""
username_count += 1
if username_count > 2:
msg = "username/login prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_username)
self.send_return()
if re.search(
pattern=password_pattern,
string=authenticate_buf,
):
# clear the authentication buffer so we don't re-read the password prompt
authenticate_buf = b""
password_count += 1
if password_count > 2:
msg = "password prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_password, redacted=True)
self.send_return()
channel_match = re.search(
pattern=prompt_pattern,
string=authenticate_buf,
)
if channel_match:
return
@ChannelTimeout(message="timed out getting prompt")
async def get_prompt(self) -> str:
"""
Get current channel prompt
Args:
N/A
Returns:
str: string of the current prompt
Raises:
N/A
"""
buf = b""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern
)
async with self._channel_lock():
self.send_return()
while True:
buf += await self.read()
channel_match = re.search(
pattern=search_pattern,
string=buf,
)
if channel_match:
current_prompt = channel_match.group(0)
return current_prompt.decode().strip()
@ChannelTimeout(message="timed out sending input to device")
async def send_input(
self,
channel_input: str,
*,
strip_prompt: bool = True,
eager: bool = False,
) -> Tuple[bytes, bytes]:
"""
Primary entry point to send data to devices in shell mode; accept input and returns result
Args:
channel_input: string input to send to channel
strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)
eager: eager mode reads and returns the `_read_until_input` value, but does not attempt
to read to the prompt pattern -- this should not be used manually! (only used by
`send_configs` with the eager flag set)
Returns:
Tuple[bytes, bytes]: tuple of "raw" output and "processed" (cleaned up/stripped) output
Raises:
N/A
"""
self._pre_send_input(channel_input=channel_input)
buf = b""
bytes_channel_input = channel_input.encode()
self.logger.info(
f"sending channel input: {channel_input}; strip_prompt: {strip_prompt}; eager: {eager}"
)
async with self._channel_lock():
self.write(channel_input=channel_input)
_buf_until_input = await self._read_until_input(channel_input=bytes_channel_input)
self.send_return()
if not eager:
buf += await self._read_until_prompt()
processed_buf = self._process_output(
buf=buf,
strip_prompt=strip_prompt,
)
return buf, processed_buf
@ChannelTimeout(message="timed out sending input to device")
async def send_input_and_read(
self,
channel_input: str,
*,
strip_prompt: bool = True,
expected_outputs: Optional[List[str]] = None,
read_duration: Optional[float] = None,
) -> Tuple[bytes, bytes]:
"""
Send a command and read until expected prompt is seen, outputs are seen, or for duration
Args:
channel_input: string input to send to channel
strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)
expected_outputs: list of strings to look for in output; if any of these are seen,
return output read up till that read
read_duration: float duration to read for
Returns:
Tuple[bytes, bytes]: tuple of "raw" output and "processed" (cleaned up/stripped) output
Raises:
N/A
"""
self._pre_send_input(channel_input=channel_input)
buf = b""
bytes_channel_input = channel_input.encode()
bytes_channel_outputs = [
channel_output.encode() for channel_output in expected_outputs or []
]
self.logger.info(
f"sending channel input and read: {channel_input}; strip_prompt: {strip_prompt}; "
f"expected_outputs: {expected_outputs}; read_duration: {read_duration}"
)
async with self._channel_lock():
self.write(channel_input=channel_input)
_buf_until_input = await self._read_until_input(channel_input=bytes_channel_input)
self.send_return()
buf += await self._read_until_prompt_or_time(
channel_outputs=bytes_channel_outputs, read_duration=read_duration
)
processed_buf = self._process_output(
buf=buf,
strip_prompt=strip_prompt,
)
return buf, processed_buf
@ChannelTimeout(message="timed out sending interactive input to device")
async def send_inputs_interact(
self,
interact_events: List[Tuple[str, str, Optional[bool]]],
*,
interaction_complete_patterns: Optional[List[str]] = None,
) -> Tuple[bytes, bytes]:
"""
Interact with a device with changing prompts per input.
Used to interact with devices where prompts change per input, and where inputs may be hidden
such as in the case of a password input. This can be used to respond to challenges from
devices such as the confirmation for the command "clear logging" on IOSXE devices for
example. You may have as many elements in the "interact_events" list as needed, and each
element of that list should be a tuple of two or three elements. The first element is always
the input to send as a string, the second should be the expected response as a string, and
the optional third a bool for whether or not the input is "hidden" (i.e. password input)
An example where we need this sort of capability:
'''
3560CX#copy flash: scp:
Source filename []? test1.txt
Address or name of remote host []? 172.31.254.100
Destination username [carl]?
Writing test1.txt
Password:
Password:
Sink: C0644 639 test1.txt
!
639 bytes copied in 12.066 secs (53 bytes/sec)
3560CX#
'''
To accomplish this we can use the following:
'''
interact = conn.channel.send_inputs_interact(
[
("copy flash: scp:", "Source filename []?", False),
("test1.txt", "Address or name of remote host []?", False),
("172.31.254.100", "Destination username [carl]?", False),
("carl", "Password:", False),
("super_secure_password", prompt, True),
]
)
'''
If we needed to deal with more prompts we could simply continue adding tuples to the list of
interact "events".
Args:
interact_events: list of tuples containing the "interactions" with the device
each list element must have an input and an expected response, and may have an
optional bool for the third and final element -- the optional bool specifies if the
input that is sent to the device is "hidden" (ex: password), if the hidden param is
not provided it is assumed the input is "normal" (not hidden)
interaction_complete_patterns: list of patterns, that if seen, indicate the interactive
"session" has ended and we should exit the interactive session.
Returns:
Tuple[bytes, bytes]: output read from the channel with no whitespace trimming/cleaning,
and the output read from the channel that has been "cleaned up"
Raises:
N/A
"""
self._pre_send_inputs_interact(interact_events=interact_events)
buf = b""
processed_buf = b""
async with self._channel_lock():
for interact_event in interact_events:
channel_input = interact_event[0]
bytes_channel_input = channel_input.encode()
channel_response = interact_event[1]
prompts = [channel_response]
if interaction_complete_patterns is not None:
prompts.extend(interaction_complete_patterns)
try:
hidden_input = interact_event[2]
except IndexError:
hidden_input = False
_channel_input = channel_input if not hidden_input else "REDACTED"
self.logger.info(
f"sending interactive input: {_channel_input}; "
f"expecting: {channel_response}; "
f"hidden_input: {hidden_input}"
)
self.write(channel_input=channel_input, redacted=bool(hidden_input))
if not channel_response or hidden_input is True:
self.send_return()
else:
buf += await self._read_until_input(channel_input=bytes_channel_input)
self.send_return()
buf += await self._read_until_explicit_prompt(prompts=prompts)
processed_buf += self._process_output(
buf=buf,
strip_prompt=False,
)
return buf, processed_buf
Classes¶
AsyncChannel¶
1 2 3 4 5 6 7 8 9 10 11 |
|
Expand source code
class AsyncChannel(BaseChannel):
def __init__(
self,
transport: AsyncTransport,
base_channel_args: BaseChannelArgs,
) -> None:
super().__init__(
transport=transport,
base_channel_args=base_channel_args,
)
self.transport: AsyncTransport
self.channel_lock: Optional[asyncio.Lock] = None
if self._base_channel_args.channel_lock:
self.channel_lock = asyncio.Lock()
@asynccontextmanager
async def _channel_lock(self) -> AsyncIterator[None]:
"""
Lock the channel during public channel operations if channel_lock is enabled
Args:
N/A
Yields:
None
Raises:
N/A
"""
if self.channel_lock:
async with self.channel_lock:
yield
else:
yield
async def read(self) -> bytes:
r"""
Read chunks of output from the channel
Replaces any \r characters that sometimes get stuffed into the output from the devices
Args:
N/A
Returns:
bytes: output read from channel
Raises:
N/A
"""
buf = await self.transport.read()
buf = buf.replace(b"\r", b"")
self.logger.debug(f"read: {repr(buf)}")
if self.channel_log:
self.channel_log.write(buf)
if b"\x1b" in buf.lower():
buf = self._strip_ansi(buf=buf)
return buf
async def _read_until_input(self, channel_input: bytes) -> bytes:
"""
Read until all channel_input has been read on the channel
Args:
channel_input: bytes that should have been written to the channel
Returns:
bytes: output read from channel while checking for the input in the channel stream
Raises:
N/A
"""
buf = b""
if not channel_input:
return buf
# squish all channel input words together and cast to lower to make comparison easier
processed_channel_input = b"".join(channel_input.lower().split())
while True:
buf += await self.read()
# replace any backspace chars (particular problem w/ junos), and remove any added spaces
# this is just for comparison of the inputs to what was read from channel
if processed_channel_input in b"".join(buf.lower().replace(b"\x08", b"").split()):
return buf
async def _read_until_prompt(self, buf: bytes = b"") -> bytes:
"""
Read until expected prompt is seen.
This reads until the "normal" `_base_channel_args.comms_prompt_pattern` is seen. The
`_read_until_explicit_prompt` method can be used to read until some pattern in an arbitrary
list of patterns is seen.
Args:
buf: output from previous reads if needed (used by scrapli netconf)
Returns:
bytes: output read from channel
Raises:
N/A
"""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern
)
read_buf = BytesIO(buf)
while True:
b = await self.read()
read_buf.write(b)
read_buf.seek(-self._base_channel_args.comms_prompt_search_depth, SEEK_END)
search_buf = read_buf.read()
channel_match = re.search(
pattern=search_pattern,
string=search_buf,
)
if channel_match:
return read_buf.getvalue()
async def _read_until_explicit_prompt(self, prompts: List[str]) -> bytes:
"""
Read until expected prompt is seen.
This method is for *explicit* prompt patterns instead of the "standard" prompt patterns
contained in the `_base_channel_args.comms_prompt_pattern` attribute. Generally this is
only used for `send_interactive` though it could be used elsewhere as well.
Args:
prompts: list of prompt patterns to look for, will return upon seeing any match
Returns:
bytes: output read from channel
Raises:
N/A
"""
search_patterns = [
self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern, pattern=prompt
)
for prompt in prompts
]
read_buf = BytesIO(b"")
while True:
b = await self.read()
read_buf.write(b)
read_buf.seek(-self._base_channel_args.comms_prompt_search_depth, SEEK_END)
search_buf = read_buf.read()
for search_pattern in search_patterns:
channel_match = re.search(
pattern=search_pattern,
string=search_buf,
)
if channel_match:
return read_buf.getvalue()
async def _read_until_prompt_or_time(
self,
buf: bytes = b"",
channel_outputs: Optional[List[bytes]] = None,
read_duration: Optional[float] = None,
) -> bytes:
"""
Read until expected prompt is seen, outputs are seen, or for duration, whichever comes first
As transport reading may block, transport timeout is temporarily set to the read_duration
and any `ScrapliTimeout` that is raised while reading is ignored.
Args:
buf: bytes from previous reads if needed
channel_outputs: List of bytes to search for in channel output, if any are seen, return
read output
read_duration: duration to read from channel for
Returns:
bytes: output read from channel
Raises:
N/A
"""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern,
)
if channel_outputs is None:
channel_outputs = []
if read_duration is None:
read_duration = 2.5
regex_channel_outputs_pattern = self._join_and_compile(channel_outputs=channel_outputs)
_transport_args = self.transport._base_transport_args # pylint: disable=W0212
previous_timeout_transport = _transport_args.timeout_transport
_transport_args.timeout_transport = int(read_duration)
read_buf = BytesIO(buf)
start = time.time()
while True:
try:
b = await self.read()
read_buf.write(b)
except ScrapliTimeout:
pass
read_buf.seek(-self._base_channel_args.comms_prompt_search_depth, SEEK_END)
search_buf = read_buf.read()
if (time.time() - start) > read_duration:
break
if any((channel_output in search_buf for channel_output in channel_outputs)):
break
if re.search(pattern=regex_channel_outputs_pattern, string=search_buf):
break
if re.search(pattern=search_pattern, string=search_buf):
break
_transport_args.timeout_transport = previous_timeout_transport
return read_buf.getvalue()
@ChannelTimeout(message="timed out during in channel ssh authentication")
async def channel_authenticate_ssh(
self, auth_password: str, auth_private_key_passphrase: str
) -> None:
"""
Handle SSH Authentication for transports that only operate "in the channel" (i.e. system)
Args:
auth_password: password to authenticate with
auth_private_key_passphrase: passphrase for ssh key if necessary
Returns:
None
Raises:
ScrapliAuthenticationFailed: if password prompt seen more than twice
ScrapliAuthenticationFailed: if passphrase prompt seen more than twice
"""
self.logger.debug("attempting in channel ssh authentication")
password_count = 0
passphrase_count = 0
authenticate_buf = b""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern
)
async with self._channel_lock():
while True:
try:
buf = await asyncio.wait_for(self.read(), timeout=1)
except asyncio.TimeoutError:
buf = b""
authenticate_buf += buf.lower()
if b"password" in authenticate_buf:
# clear the authentication buffer so we don't re-read the password prompt
authenticate_buf = b""
password_count += 1
if password_count > 2:
msg = "password prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_password, redacted=True)
self.send_return()
if b"enter passphrase for key" in authenticate_buf:
# clear the authentication buffer so we don't re-read the passphrase prompt
authenticate_buf = b""
passphrase_count += 1
if passphrase_count > 2:
msg = "passphrase prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_private_key_passphrase, redacted=True)
self.send_return()
channel_match = re.search(
pattern=search_pattern,
string=authenticate_buf,
)
if channel_match:
return
@ChannelTimeout(message="timed out during in channel telnet authentication")
async def channel_authenticate_telnet( # noqa: C901
self, auth_username: str = "", auth_password: str = ""
) -> None:
"""
Handle Telnet Authentication
Args:
auth_username: username to use for telnet authentication
auth_password: password to use for telnet authentication
Returns:
None
Raises:
ScrapliAuthenticationFailed: if password prompt seen more than twice
ScrapliAuthenticationFailed: if login prompt seen more than twice
"""
self.logger.debug("attempting in channel telnet authentication")
username_count = 0
password_count = 0
authenticate_buf = b""
(
username_pattern,
password_pattern,
prompt_pattern,
auth_start_time,
return_interval,
) = self._pre_channel_authenticate_telnet()
read_interval = self._base_channel_args.timeout_ops / 20
return_attempts = 1
async with self._channel_lock():
while True:
try:
buf = await asyncio.wait_for(self.read(), timeout=read_interval)
except asyncio.TimeoutError:
buf = b""
if not buf:
current_iteration_time = datetime.now().timestamp()
if (current_iteration_time - auth_start_time) > (
return_interval * return_attempts
):
self.send_return()
return_attempts += 1
authenticate_buf += buf.lower()
if re.search(
pattern=username_pattern,
string=authenticate_buf,
):
# clear the authentication buffer so we don't re-read the username prompt
authenticate_buf = b""
username_count += 1
if username_count > 2:
msg = "username/login prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_username)
self.send_return()
if re.search(
pattern=password_pattern,
string=authenticate_buf,
):
# clear the authentication buffer so we don't re-read the password prompt
authenticate_buf = b""
password_count += 1
if password_count > 2:
msg = "password prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_password, redacted=True)
self.send_return()
channel_match = re.search(
pattern=prompt_pattern,
string=authenticate_buf,
)
if channel_match:
return
@ChannelTimeout(message="timed out getting prompt")
async def get_prompt(self) -> str:
"""
Get current channel prompt
Args:
N/A
Returns:
str: string of the current prompt
Raises:
N/A
"""
buf = b""
search_pattern = self._get_prompt_pattern(
class_pattern=self._base_channel_args.comms_prompt_pattern
)
async with self._channel_lock():
self.send_return()
while True:
buf += await self.read()
channel_match = re.search(
pattern=search_pattern,
string=buf,
)
if channel_match:
current_prompt = channel_match.group(0)
return current_prompt.decode().strip()
@ChannelTimeout(message="timed out sending input to device")
async def send_input(
self,
channel_input: str,
*,
strip_prompt: bool = True,
eager: bool = False,
) -> Tuple[bytes, bytes]:
"""
Primary entry point to send data to devices in shell mode; accept input and returns result
Args:
channel_input: string input to send to channel
strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)
eager: eager mode reads and returns the `_read_until_input` value, but does not attempt
to read to the prompt pattern -- this should not be used manually! (only used by
`send_configs` with the eager flag set)
Returns:
Tuple[bytes, bytes]: tuple of "raw" output and "processed" (cleaned up/stripped) output
Raises:
N/A
"""
self._pre_send_input(channel_input=channel_input)
buf = b""
bytes_channel_input = channel_input.encode()
self.logger.info(
f"sending channel input: {channel_input}; strip_prompt: {strip_prompt}; eager: {eager}"
)
async with self._channel_lock():
self.write(channel_input=channel_input)
_buf_until_input = await self._read_until_input(channel_input=bytes_channel_input)
self.send_return()
if not eager:
buf += await self._read_until_prompt()
processed_buf = self._process_output(
buf=buf,
strip_prompt=strip_prompt,
)
return buf, processed_buf
@ChannelTimeout(message="timed out sending input to device")
async def send_input_and_read(
self,
channel_input: str,
*,
strip_prompt: bool = True,
expected_outputs: Optional[List[str]] = None,
read_duration: Optional[float] = None,
) -> Tuple[bytes, bytes]:
"""
Send a command and read until expected prompt is seen, outputs are seen, or for duration
Args:
channel_input: string input to send to channel
strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)
expected_outputs: list of strings to look for in output; if any of these are seen,
return output read up till that read
read_duration: float duration to read for
Returns:
Tuple[bytes, bytes]: tuple of "raw" output and "processed" (cleaned up/stripped) output
Raises:
N/A
"""
self._pre_send_input(channel_input=channel_input)
buf = b""
bytes_channel_input = channel_input.encode()
bytes_channel_outputs = [
channel_output.encode() for channel_output in expected_outputs or []
]
self.logger.info(
f"sending channel input and read: {channel_input}; strip_prompt: {strip_prompt}; "
f"expected_outputs: {expected_outputs}; read_duration: {read_duration}"
)
async with self._channel_lock():
self.write(channel_input=channel_input)
_buf_until_input = await self._read_until_input(channel_input=bytes_channel_input)
self.send_return()
buf += await self._read_until_prompt_or_time(
channel_outputs=bytes_channel_outputs, read_duration=read_duration
)
processed_buf = self._process_output(
buf=buf,
strip_prompt=strip_prompt,
)
return buf, processed_buf
@ChannelTimeout(message="timed out sending interactive input to device")
async def send_inputs_interact(
self,
interact_events: List[Tuple[str, str, Optional[bool]]],
*,
interaction_complete_patterns: Optional[List[str]] = None,
) -> Tuple[bytes, bytes]:
"""
Interact with a device with changing prompts per input.
Used to interact with devices where prompts change per input, and where inputs may be hidden
such as in the case of a password input. This can be used to respond to challenges from
devices such as the confirmation for the command "clear logging" on IOSXE devices for
example. You may have as many elements in the "interact_events" list as needed, and each
element of that list should be a tuple of two or three elements. The first element is always
the input to send as a string, the second should be the expected response as a string, and
the optional third a bool for whether or not the input is "hidden" (i.e. password input)
An example where we need this sort of capability:
'''
3560CX#copy flash: scp:
Source filename []? test1.txt
Address or name of remote host []? 172.31.254.100
Destination username [carl]?
Writing test1.txt
Password:
Password:
Sink: C0644 639 test1.txt
!
639 bytes copied in 12.066 secs (53 bytes/sec)
3560CX#
'''
To accomplish this we can use the following:
'''
interact = conn.channel.send_inputs_interact(
[
("copy flash: scp:", "Source filename []?", False),
("test1.txt", "Address or name of remote host []?", False),
("172.31.254.100", "Destination username [carl]?", False),
("carl", "Password:", False),
("super_secure_password", prompt, True),
]
)
'''
If we needed to deal with more prompts we could simply continue adding tuples to the list of
interact "events".
Args:
interact_events: list of tuples containing the "interactions" with the device
each list element must have an input and an expected response, and may have an
optional bool for the third and final element -- the optional bool specifies if the
input that is sent to the device is "hidden" (ex: password), if the hidden param is
not provided it is assumed the input is "normal" (not hidden)
interaction_complete_patterns: list of patterns, that if seen, indicate the interactive
"session" has ended and we should exit the interactive session.
Returns:
Tuple[bytes, bytes]: output read from the channel with no whitespace trimming/cleaning,
and the output read from the channel that has been "cleaned up"
Raises:
N/A
"""
self._pre_send_inputs_interact(interact_events=interact_events)
buf = b""
processed_buf = b""
async with self._channel_lock():
for interact_event in interact_events:
channel_input = interact_event[0]
bytes_channel_input = channel_input.encode()
channel_response = interact_event[1]
prompts = [channel_response]
if interaction_complete_patterns is not None:
prompts.extend(interaction_complete_patterns)
try:
hidden_input = interact_event[2]
except IndexError:
hidden_input = False
_channel_input = channel_input if not hidden_input else "REDACTED"
self.logger.info(
f"sending interactive input: {_channel_input}; "
f"expecting: {channel_response}; "
f"hidden_input: {hidden_input}"
)
self.write(channel_input=channel_input, redacted=bool(hidden_input))
if not channel_response or hidden_input is True:
self.send_return()
else:
buf += await self._read_until_input(channel_input=bytes_channel_input)
self.send_return()
buf += await self._read_until_explicit_prompt(prompts=prompts)
processed_buf += self._process_output(
buf=buf,
strip_prompt=False,
)
return buf, processed_buf
Ancestors (in MRO)¶
- scrapli.channel.base_channel.BaseChannel
Methods¶
channel_authenticate_ssh¶
channel_authenticate_ssh(self, auth_password: str, auth_private_key_passphrase: str) ‑> NoneType
1 2 3 4 5 6 7 8 9 10 11 12 |
|
channel_authenticate_telnet¶
channel_authenticate_telnet(self, auth_username: str = '', auth_password: str = '') ‑> NoneType
1 2 3 4 5 6 7 8 9 10 11 12 |
|
get_prompt¶
get_prompt(self) ‑> str
1 2 3 4 5 6 7 8 9 10 |
|
read¶
read(self) ‑> bytes
1 2 3 4 5 6 7 8 9 10 11 12 |
|
send_input¶
send_input(self, channel_input: str, *, strip_prompt: bool = True, eager: bool = False) ‑> Tuple[bytes, bytes]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
send_input_and_read¶
send_input_and_read(self, channel_input: str, *, strip_prompt: bool = True, expected_outputs: Union[List[str], NoneType] = None, read_duration: Union[float, NoneType] = None) ‑> Tuple[bytes, bytes]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
send_inputs_interact¶
send_inputs_interact(self, interact_events: List[Tuple[str, str, Union[bool, NoneType]]], *, interaction_complete_patterns: Union[List[str], NoneType] = None) ‑> Tuple[bytes, bytes]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
|