Module scrapli_netconf.channel.sync_channel¶
scrapli_netconf.channel.sync_channel
Expand source code
"""scrapli_netconf.channel.sync_channel"""
import re
from typing import Optional
from scrapli.channel import Channel
from scrapli.channel.base_channel import BaseChannelArgs
from scrapli.decorators import ChannelTimeout
from scrapli.exceptions import ScrapliAuthenticationFailed, ScrapliTimeout
from scrapli.transport.base import Transport
from scrapli_netconf.channel.base_channel import BaseNetconfChannel, NetconfBaseChannelArgs
from scrapli_netconf.constants import NetconfVersion
HELLO_MATCH = re.compile(pattern=rb"<(\w+\:){0,1}hello", flags=re.I)
class NetconfChannel(Channel, BaseNetconfChannel):
def __init__(
self,
transport: Transport,
base_channel_args: BaseChannelArgs,
netconf_base_channel_args: NetconfBaseChannelArgs,
):
super().__init__(transport=transport, base_channel_args=base_channel_args)
self._netconf_base_channel_args = netconf_base_channel_args
# always use `]]>]]>` as the initial prompt to match
self._base_channel_args.comms_prompt_pattern = "]]>]]>"
self._server_echo: Optional[bool] = None
self._capabilities_buf = b""
def open_netconf(self) -> None:
"""
Open the netconf channel
Args:
N/A
Returns:
None
Raises:
N/A
"""
# open in scrapli core is where we open channel log (if applicable), do that
self.open()
raw_server_capabilities = self._get_server_capabilities()
self._process_capabilities_exchange(raw_server_capabilities=raw_server_capabilities)
self._send_client_capabilities()
@staticmethod
def _authenticate_check_hello(buf: bytes) -> bool:
"""
Check if "hello" message is in output
Args:
buf: bytes output from the channel
Returns:
bool: true if hello message is seen, otherwise false
Raises:
N/A
"""
hello_match = re.search(pattern=HELLO_MATCH, string=buf)
if hello_match:
return True
return False
@ChannelTimeout("timed out during in channel netconf authentication")
def channel_authenticate_netconf(
self, auth_password: str, auth_private_key_passphrase: str
) -> None:
"""
Handle SSH Authentication for transports that only operate "in the channel" (i.e. system)
Args:
auth_password: password to authenticate with
auth_private_key_passphrase: passphrase for ssh key if necessary
Returns:
None
Raises:
ScrapliAuthenticationFailed: if password prompt seen more than twice
ScrapliAuthenticationFailed: if passphrase prompt seen more than twice
"""
self.logger.debug("attempting in channel netconf authentication")
password_count = 0
passphrase_count = 0
authenticate_buf = b""
with self._channel_lock():
while True:
buf = self.read()
authenticate_buf += buf.lower()
self._capabilities_buf += buf
self._ssh_message_handler(output=authenticate_buf)
if b"password" in authenticate_buf:
# clear the authentication buffer so we don't re-read the password prompt
authenticate_buf = b""
password_count += 1
if password_count > 2:
msg = "password prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_password, redacted=True)
self.send_return()
if b"enter passphrase for key" in authenticate_buf:
# clear the authentication buffer so we don't re-read the passphrase prompt
authenticate_buf = b""
passphrase_count += 1
if passphrase_count > 2:
msg = "passphrase prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_private_key_passphrase, redacted=True)
self.send_return()
if self._authenticate_check_hello(buf=authenticate_buf):
self.logger.info(
"found start of server capabilities, authentication successful"
)
return
@ChannelTimeout(
"timed out determining if session is authenticated/getting server capabilities",
)
def _get_server_capabilities(self) -> bytes:
"""
Read until all server capabilities have been sent by server
Args:
N/A
Returns:
bytes: raw bytes containing server capabilities
Raises:
N/A
"""
capabilities_buf = self._capabilities_buf
# reset this to empty to avoid any confusion now that we are moving on
self._capabilities_buf = b""
with self._channel_lock():
while b"]]>]]>" not in capabilities_buf:
capabilities_buf += self.read()
self.logger.debug(f"received raw server capabilities: {repr(capabilities_buf)}")
return capabilities_buf
@ChannelTimeout("timed out sending client capabilities")
def _send_client_capabilities(
self,
) -> None:
"""
Send client capabilities to the netconf server
Args:
N/A
Returns:
None
Raises:
N/A
"""
with self._channel_lock():
bytes_client_capabilities = self._pre_send_client_capabilities(
client_capabilities=self._netconf_base_channel_args.client_capabilities
)
self._read_until_input(channel_input=bytes_client_capabilities)
self.send_return()
def _read_until_input(self, channel_input: bytes) -> bytes:
"""
Async read until all input has been entered.
Args:
channel_input: string to write to channel
Returns:
bytes: output read from channel
Raises:
N/A
"""
output = b""
if self._server_echo is None or self._server_echo is False:
# if server_echo is `None` we dont know if the server echoes yet, so just return nothing
# if its False we know it doesnt echo and we can return empty byte string anyway
return output
if not channel_input:
self.logger.info(f"Read: {repr(output)}")
return output
while True:
output += self.read()
# if we have all the input *or* we see the closing rpc tag we know we are done here
if channel_input in output or b"rpc>" in output:
break
self.logger.info(f"Read: {repr(output)}")
return output
def send_input_netconf(self, channel_input: str) -> bytes:
"""
Send inputs to netconf server
Args:
channel_input: string of the base xml message to send to netconf server
Returns:
bytes: bytes result of message sent to netconf server
Raises:
ScrapliTimeout: re-raises channel timeouts with additional message if channel input may
be big enough to require setting `use_compressed_parser` to false -- note that this
has only been seen as an issue with NXOS so far.
"""
bytes_final_channel_input = channel_input.encode()
buf: bytes
buf, _ = super().send_input(channel_input=channel_input, strip_prompt=False, eager=True)
if bytes_final_channel_input in buf:
# if we got the input AND the rpc-reply we can strip out our inputs so we just have the
# reply remaining
buf = buf.split(bytes_final_channel_input)[1]
try:
buf = self._read_until_prompt(buf=buf)
except ScrapliTimeout as exc:
if len(channel_input) >= 4096:
msg = (
"timed out finding prompt after sending input, input is greater than 4096 "
"chars, try setting 'use_compressed_parser' to False"
)
self.logger.info(msg)
raise ScrapliTimeout(msg) from exc
raise ScrapliTimeout from exc
if self._server_echo is None:
# At least per early drafts of the netconf over ssh rfcs the netconf servers MUST NOT
# echo the input commands back to the client. In the case of "normal" scrapli netconf
# with the system transport this happens anyway because we combine the stdin and stdout
# fds into a single pty, however for other transports we have an actual stdin and
# stdout fd to read/write. It seems that at the very least IOSXE with NETCONF 1.1 seems
# to want to echo inputs back onto to the stdout for the channel. This is totally ok
# and we can deal with it, we just need to *know* that it is happening, so while the
# _server_echo attribute is still `None`, we can go ahead and see if the input we sent
# is in the output we read off the channel. If it is *not* we know the server does *not*
# echo and we can move on. If it *is* in the output, we know the server echoes, and we
# also have one additional step in that we need to read "until prompt" again in order to
# capture the reply to our rpc.
#
# See: https://tools.ietf.org/html/draft-ietf-netconf-ssh-02 (search for "echo")
self.logger.debug("server echo is unset, determining if server echoes inputs now")
if bytes_final_channel_input in buf:
self.logger.debug("server echoes inputs, setting _server_echo to 'true'")
self._server_echo = True
# since echo is True and we only read until our input (because our inputs always end
# with a "prompt" that we read until) we need to once again read until prompt, this
# read will read all the way up through the *reply* to the prompt at end of the
# reply message
buf = self._read_until_prompt(buf=b"")
else:
self.logger.debug("server does *not* echo inputs, setting _server_echo to 'false'")
self._server_echo = False
if self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_1:
# netconf 1.1 with "chunking" style message format needs an extra return char here
self.send_return()
return buf
Classes¶
NetconfChannel¶
1 2 3 4 5 6 7 8 9 10 11 |
|
Expand source code
class NetconfChannel(Channel, BaseNetconfChannel):
def __init__(
self,
transport: Transport,
base_channel_args: BaseChannelArgs,
netconf_base_channel_args: NetconfBaseChannelArgs,
):
super().__init__(transport=transport, base_channel_args=base_channel_args)
self._netconf_base_channel_args = netconf_base_channel_args
# always use `]]>]]>` as the initial prompt to match
self._base_channel_args.comms_prompt_pattern = "]]>]]>"
self._server_echo: Optional[bool] = None
self._capabilities_buf = b""
def open_netconf(self) -> None:
"""
Open the netconf channel
Args:
N/A
Returns:
None
Raises:
N/A
"""
# open in scrapli core is where we open channel log (if applicable), do that
self.open()
raw_server_capabilities = self._get_server_capabilities()
self._process_capabilities_exchange(raw_server_capabilities=raw_server_capabilities)
self._send_client_capabilities()
@staticmethod
def _authenticate_check_hello(buf: bytes) -> bool:
"""
Check if "hello" message is in output
Args:
buf: bytes output from the channel
Returns:
bool: true if hello message is seen, otherwise false
Raises:
N/A
"""
hello_match = re.search(pattern=HELLO_MATCH, string=buf)
if hello_match:
return True
return False
@ChannelTimeout("timed out during in channel netconf authentication")
def channel_authenticate_netconf(
self, auth_password: str, auth_private_key_passphrase: str
) -> None:
"""
Handle SSH Authentication for transports that only operate "in the channel" (i.e. system)
Args:
auth_password: password to authenticate with
auth_private_key_passphrase: passphrase for ssh key if necessary
Returns:
None
Raises:
ScrapliAuthenticationFailed: if password prompt seen more than twice
ScrapliAuthenticationFailed: if passphrase prompt seen more than twice
"""
self.logger.debug("attempting in channel netconf authentication")
password_count = 0
passphrase_count = 0
authenticate_buf = b""
with self._channel_lock():
while True:
buf = self.read()
authenticate_buf += buf.lower()
self._capabilities_buf += buf
self._ssh_message_handler(output=authenticate_buf)
if b"password" in authenticate_buf:
# clear the authentication buffer so we don't re-read the password prompt
authenticate_buf = b""
password_count += 1
if password_count > 2:
msg = "password prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_password, redacted=True)
self.send_return()
if b"enter passphrase for key" in authenticate_buf:
# clear the authentication buffer so we don't re-read the passphrase prompt
authenticate_buf = b""
passphrase_count += 1
if passphrase_count > 2:
msg = "passphrase prompt seen more than once, assuming auth failed"
self.logger.critical(msg)
raise ScrapliAuthenticationFailed(msg)
self.write(channel_input=auth_private_key_passphrase, redacted=True)
self.send_return()
if self._authenticate_check_hello(buf=authenticate_buf):
self.logger.info(
"found start of server capabilities, authentication successful"
)
return
@ChannelTimeout(
"timed out determining if session is authenticated/getting server capabilities",
)
def _get_server_capabilities(self) -> bytes:
"""
Read until all server capabilities have been sent by server
Args:
N/A
Returns:
bytes: raw bytes containing server capabilities
Raises:
N/A
"""
capabilities_buf = self._capabilities_buf
# reset this to empty to avoid any confusion now that we are moving on
self._capabilities_buf = b""
with self._channel_lock():
while b"]]>]]>" not in capabilities_buf:
capabilities_buf += self.read()
self.logger.debug(f"received raw server capabilities: {repr(capabilities_buf)}")
return capabilities_buf
@ChannelTimeout("timed out sending client capabilities")
def _send_client_capabilities(
self,
) -> None:
"""
Send client capabilities to the netconf server
Args:
N/A
Returns:
None
Raises:
N/A
"""
with self._channel_lock():
bytes_client_capabilities = self._pre_send_client_capabilities(
client_capabilities=self._netconf_base_channel_args.client_capabilities
)
self._read_until_input(channel_input=bytes_client_capabilities)
self.send_return()
def _read_until_input(self, channel_input: bytes) -> bytes:
"""
Async read until all input has been entered.
Args:
channel_input: string to write to channel
Returns:
bytes: output read from channel
Raises:
N/A
"""
output = b""
if self._server_echo is None or self._server_echo is False:
# if server_echo is `None` we dont know if the server echoes yet, so just return nothing
# if its False we know it doesnt echo and we can return empty byte string anyway
return output
if not channel_input:
self.logger.info(f"Read: {repr(output)}")
return output
while True:
output += self.read()
# if we have all the input *or* we see the closing rpc tag we know we are done here
if channel_input in output or b"rpc>" in output:
break
self.logger.info(f"Read: {repr(output)}")
return output
def send_input_netconf(self, channel_input: str) -> bytes:
"""
Send inputs to netconf server
Args:
channel_input: string of the base xml message to send to netconf server
Returns:
bytes: bytes result of message sent to netconf server
Raises:
ScrapliTimeout: re-raises channel timeouts with additional message if channel input may
be big enough to require setting `use_compressed_parser` to false -- note that this
has only been seen as an issue with NXOS so far.
"""
bytes_final_channel_input = channel_input.encode()
buf: bytes
buf, _ = super().send_input(channel_input=channel_input, strip_prompt=False, eager=True)
if bytes_final_channel_input in buf:
# if we got the input AND the rpc-reply we can strip out our inputs so we just have the
# reply remaining
buf = buf.split(bytes_final_channel_input)[1]
try:
buf = self._read_until_prompt(buf=buf)
except ScrapliTimeout as exc:
if len(channel_input) >= 4096:
msg = (
"timed out finding prompt after sending input, input is greater than 4096 "
"chars, try setting 'use_compressed_parser' to False"
)
self.logger.info(msg)
raise ScrapliTimeout(msg) from exc
raise ScrapliTimeout from exc
if self._server_echo is None:
# At least per early drafts of the netconf over ssh rfcs the netconf servers MUST NOT
# echo the input commands back to the client. In the case of "normal" scrapli netconf
# with the system transport this happens anyway because we combine the stdin and stdout
# fds into a single pty, however for other transports we have an actual stdin and
# stdout fd to read/write. It seems that at the very least IOSXE with NETCONF 1.1 seems
# to want to echo inputs back onto to the stdout for the channel. This is totally ok
# and we can deal with it, we just need to *know* that it is happening, so while the
# _server_echo attribute is still `None`, we can go ahead and see if the input we sent
# is in the output we read off the channel. If it is *not* we know the server does *not*
# echo and we can move on. If it *is* in the output, we know the server echoes, and we
# also have one additional step in that we need to read "until prompt" again in order to
# capture the reply to our rpc.
#
# See: https://tools.ietf.org/html/draft-ietf-netconf-ssh-02 (search for "echo")
self.logger.debug("server echo is unset, determining if server echoes inputs now")
if bytes_final_channel_input in buf:
self.logger.debug("server echoes inputs, setting _server_echo to 'true'")
self._server_echo = True
# since echo is True and we only read until our input (because our inputs always end
# with a "prompt" that we read until) we need to once again read until prompt, this
# read will read all the way up through the *reply* to the prompt at end of the
# reply message
buf = self._read_until_prompt(buf=b"")
else:
self.logger.debug("server does *not* echo inputs, setting _server_echo to 'false'")
self._server_echo = False
if self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_1:
# netconf 1.1 with "chunking" style message format needs an extra return char here
self.send_return()
return buf
Ancestors (in MRO)¶
- scrapli.channel.sync_channel.Channel
- scrapli_netconf.channel.base_channel.BaseNetconfChannel
- scrapli.channel.base_channel.BaseChannel
Methods¶
channel_authenticate_netconf¶
channel_authenticate_netconf(self, auth_password: str, auth_private_key_passphrase: str) ‑> NoneType
1 2 3 4 5 6 7 8 9 10 11 12 |
|
open_netconf¶
open_netconf(self) ‑> NoneType
1 2 3 4 5 6 7 8 9 10 |
|
send_input_netconf¶
send_input_netconf(self, channel_input: str) ‑> bytes
1 2 3 4 5 6 7 8 9 10 11 12 |
|