Module scrapli_netconf.channel.async_channel¶
scrapli_netconf.channel.async_channel
Expand source code
"""scrapli_netconf.channel.async_channel"""
from scrapli.channel import AsyncChannel
from scrapli.channel.base_channel import BaseChannelArgs
from scrapli.decorators import ChannelTimeout
from scrapli.transport.base.async_transport import AsyncTransport
from scrapli_netconf.channel.base_channel import BaseNetconfChannel, NetconfBaseChannelArgs
from scrapli_netconf.constants import NetconfVersion
class AsyncNetconfChannel(AsyncChannel, BaseNetconfChannel):
def __init__(
self,
transport: AsyncTransport,
base_channel_args: BaseChannelArgs,
netconf_base_channel_args: NetconfBaseChannelArgs,
):
super().__init__(transport=transport, base_channel_args=base_channel_args)
self._netconf_base_channel_args = netconf_base_channel_args
# always use `]]>]]>` as the initial prompt to match
self._base_channel_args.comms_prompt_pattern = "]]>]]>"
self._server_echo = False
self._capabilities_buf = b""
async def open_netconf(self) -> None:
"""
Open the netconf channel
Args:
N/A
Returns:
None
Raises:
N/A
"""
# open in scrapli core is where we open channel log (if applicable), do that
self.open()
raw_server_capabilities = await self._get_server_capabilities()
self._process_capabilities_exchange(raw_server_capabilities=raw_server_capabilities)
await self._send_client_capabilities()
@ChannelTimeout(
"timed out determining if session is authenticated/getting server capabilities",
)
async def _get_server_capabilities(self) -> bytes:
"""
Read until all server capabilities have been sent by server
Args:
N/A
Returns:
bytes: raw bytes containing server capabilities
Raises:
N/A
"""
capabilities_buf = self._capabilities_buf
# reset this to empty to avoid any confusion now that we are moving on
self._capabilities_buf = b""
async with self._channel_lock():
while b"]]>]]>" not in capabilities_buf:
capabilities_buf += await self.read()
self.logger.debug(f"received raw server capabilities: {repr(capabilities_buf)}")
return capabilities_buf
@ChannelTimeout("timed out sending client capabilities")
async def _send_client_capabilities(
self,
) -> None:
"""
Send client capabilities to the netconf server
Args:
N/A
Returns:
None
Raises:
N/A
"""
async with self._channel_lock():
_ = self._pre_send_client_capabilities(
client_capabilities=self._netconf_base_channel_args.client_capabilities
)
self.send_return()
async def _read_until_input(self, channel_input: bytes) -> bytes:
"""
Async read until all input has been entered.
Args:
channel_input: string to write to channel
Returns:
bytes: output read from channel
Raises:
N/A
"""
output = b""
if self._server_echo is None or self._server_echo is False:
# if server_echo is `None` we dont know if the server echoes yet, so just return nothing
# if its False we know it doesnt echo and we can return empty byte string anyway
return output
if not channel_input:
self.logger.info(f"Read: {repr(output)}")
return output
while True:
output += await self.read()
# if we have all the input *or* we see the closing rpc tag we know we are done here
if channel_input in output or b"rpc>" in output:
break
self.logger.info(f"Read: {repr(output)}")
return output
async def send_input_netconf(self, channel_input: str) -> bytes:
"""
Send inputs to netconf server
Args:
channel_input: string of the base xml message to send to netconf server
Returns:
bytes: bytes result of message sent to netconf server
Raises:
N/A
"""
bytes_final_channel_input = channel_input.encode()
buf: bytes
buf, _ = await super().send_input(
channel_input=channel_input, strip_prompt=False, eager=True
)
if bytes_final_channel_input in buf:
buf = buf.split(bytes_final_channel_input)[1]
buf = await self._read_until_prompt(buf=buf)
if self._server_echo is None:
# At least per early drafts of the netconf over ssh rfcs the netconf servers MUST NOT
# echo the input commands back to the client. In the case of "normal" scrapli netconf
# with the system transport this happens anyway because we combine the stdin and stdout
# fds into a single pty, however for other transports we have an actual stdin and
# stdout fd to read/write. It seems that at the very least IOSXE with NETCONF 1.1 seems
# to want to echo inputs back onto to the stdout for the channel. This is totally ok
# and we can deal with it, we just need to *know* that it is happening, so while the
# _server_echo attribute is still `None`, we can go ahead and see if the input we sent
# is in the output we read off the channel. If it is *not* we know the server does *not*
# echo and we can move on. If it *is* in the output, we know the server echoes, and we
# also have one additional step in that we need to read "until prompt" again in order to
# capture the reply to our rpc.
#
# See: https://tools.ietf.org/html/draft-ietf-netconf-ssh-02 (search for "echo")
self.logger.debug("server echo is unset, determining if server echoes inputs now")
if bytes_final_channel_input in buf:
self.logger.debug("server echoes inputs, setting _server_echo to 'true'")
self._server_echo = True
# since echo is True and we only read until our input (because our inputs always end
# with a "prompt" that we read until) we need to once again read until prompt, this
# read will read all the way up through the *reply* to the prompt at end of the
# reply message
buf = await self._read_until_prompt(buf=b"")
else:
self.logger.debug("server does *not* echo inputs, setting _server_echo to 'false'")
self._server_echo = False
if self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_1:
# netconf 1.1 with "chunking" style message format needs an extra return char here
self.send_return()
return buf
Classes¶
AsyncNetconfChannel¶
1 2 3 4 5 6 7 8 9 10 11 |
|
Expand source code
class AsyncNetconfChannel(AsyncChannel, BaseNetconfChannel):
def __init__(
self,
transport: AsyncTransport,
base_channel_args: BaseChannelArgs,
netconf_base_channel_args: NetconfBaseChannelArgs,
):
super().__init__(transport=transport, base_channel_args=base_channel_args)
self._netconf_base_channel_args = netconf_base_channel_args
# always use `]]>]]>` as the initial prompt to match
self._base_channel_args.comms_prompt_pattern = "]]>]]>"
self._server_echo = False
self._capabilities_buf = b""
async def open_netconf(self) -> None:
"""
Open the netconf channel
Args:
N/A
Returns:
None
Raises:
N/A
"""
# open in scrapli core is where we open channel log (if applicable), do that
self.open()
raw_server_capabilities = await self._get_server_capabilities()
self._process_capabilities_exchange(raw_server_capabilities=raw_server_capabilities)
await self._send_client_capabilities()
@ChannelTimeout(
"timed out determining if session is authenticated/getting server capabilities",
)
async def _get_server_capabilities(self) -> bytes:
"""
Read until all server capabilities have been sent by server
Args:
N/A
Returns:
bytes: raw bytes containing server capabilities
Raises:
N/A
"""
capabilities_buf = self._capabilities_buf
# reset this to empty to avoid any confusion now that we are moving on
self._capabilities_buf = b""
async with self._channel_lock():
while b"]]>]]>" not in capabilities_buf:
capabilities_buf += await self.read()
self.logger.debug(f"received raw server capabilities: {repr(capabilities_buf)}")
return capabilities_buf
@ChannelTimeout("timed out sending client capabilities")
async def _send_client_capabilities(
self,
) -> None:
"""
Send client capabilities to the netconf server
Args:
N/A
Returns:
None
Raises:
N/A
"""
async with self._channel_lock():
_ = self._pre_send_client_capabilities(
client_capabilities=self._netconf_base_channel_args.client_capabilities
)
self.send_return()
async def _read_until_input(self, channel_input: bytes) -> bytes:
"""
Async read until all input has been entered.
Args:
channel_input: string to write to channel
Returns:
bytes: output read from channel
Raises:
N/A
"""
output = b""
if self._server_echo is None or self._server_echo is False:
# if server_echo is `None` we dont know if the server echoes yet, so just return nothing
# if its False we know it doesnt echo and we can return empty byte string anyway
return output
if not channel_input:
self.logger.info(f"Read: {repr(output)}")
return output
while True:
output += await self.read()
# if we have all the input *or* we see the closing rpc tag we know we are done here
if channel_input in output or b"rpc>" in output:
break
self.logger.info(f"Read: {repr(output)}")
return output
async def send_input_netconf(self, channel_input: str) -> bytes:
"""
Send inputs to netconf server
Args:
channel_input: string of the base xml message to send to netconf server
Returns:
bytes: bytes result of message sent to netconf server
Raises:
N/A
"""
bytes_final_channel_input = channel_input.encode()
buf: bytes
buf, _ = await super().send_input(
channel_input=channel_input, strip_prompt=False, eager=True
)
if bytes_final_channel_input in buf:
buf = buf.split(bytes_final_channel_input)[1]
buf = await self._read_until_prompt(buf=buf)
if self._server_echo is None:
# At least per early drafts of the netconf over ssh rfcs the netconf servers MUST NOT
# echo the input commands back to the client. In the case of "normal" scrapli netconf
# with the system transport this happens anyway because we combine the stdin and stdout
# fds into a single pty, however for other transports we have an actual stdin and
# stdout fd to read/write. It seems that at the very least IOSXE with NETCONF 1.1 seems
# to want to echo inputs back onto to the stdout for the channel. This is totally ok
# and we can deal with it, we just need to *know* that it is happening, so while the
# _server_echo attribute is still `None`, we can go ahead and see if the input we sent
# is in the output we read off the channel. If it is *not* we know the server does *not*
# echo and we can move on. If it *is* in the output, we know the server echoes, and we
# also have one additional step in that we need to read "until prompt" again in order to
# capture the reply to our rpc.
#
# See: https://tools.ietf.org/html/draft-ietf-netconf-ssh-02 (search for "echo")
self.logger.debug("server echo is unset, determining if server echoes inputs now")
if bytes_final_channel_input in buf:
self.logger.debug("server echoes inputs, setting _server_echo to 'true'")
self._server_echo = True
# since echo is True and we only read until our input (because our inputs always end
# with a "prompt" that we read until) we need to once again read until prompt, this
# read will read all the way up through the *reply* to the prompt at end of the
# reply message
buf = await self._read_until_prompt(buf=b"")
else:
self.logger.debug("server does *not* echo inputs, setting _server_echo to 'false'")
self._server_echo = False
if self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_1:
# netconf 1.1 with "chunking" style message format needs an extra return char here
self.send_return()
return buf
Ancestors (in MRO)¶
- scrapli.channel.async_channel.AsyncChannel
- scrapli_netconf.channel.base_channel.BaseNetconfChannel
- scrapli.channel.base_channel.BaseChannel
Methods¶
open_netconf¶
open_netconf(self) ‑> NoneType
1 2 3 4 5 6 7 8 9 10 |
|
send_input_netconf¶
send_input_netconf(self, channel_input: str) ‑> bytes
1 2 3 4 5 6 7 8 9 10 |
|