Archived documentation version rendered and hosted by DevNetExpertTraining.com

Module netmiko.utilities

Miscellaneous utility functions.

Source code
"""Miscellaneous utility functions."""
from glob import glob
import sys
import io
import os
from pathlib import Path
import functools
from datetime import datetime
from netmiko._textfsm import _clitable as clitable
from netmiko._textfsm._clitable import CliTableError
from netmiko import log

try:
    from ttp import ttp

    TTP_INSTALLED = True

except ImportError:
    TTP_INSTALLED = False

try:
    from genie.conf.base import Device
    from genie.libs.parser.utils import get_parser
    from pyats.datastructures import AttrDict

    GENIE_INSTALLED = True
except ImportError:
    GENIE_INSTALLED = False

# If we are on python < 3.7, we need to force the import of importlib.resources backport
try:
    from importlib.resources import path as importresources_path
except ModuleNotFoundError:
    from importlib_resources import path as importresources_path

try:
    import serial.tools.list_ports

    PYSERIAL_INSTALLED = True
except ImportError:
    PYSERIAL_INSTALLED = False

# Dictionary mapping 'show run' for vendors with different command
SHOW_RUN_MAPPER = {
    "brocade_fos": "configShow",
    "juniper": "show configuration",
    "juniper_junos": "show configuration",
    "extreme": "show configuration",
    "extreme_ers": "show running-config",
    "extreme_exos": "show configuration",
    "extreme_netiron": "show running-config",
    "extreme_nos": "show running-config",
    "extreme_slx": "show running-config",
    "extreme_vdx": "show running-config",
    "extreme_vsp": "show running-config",
    "extreme_wing": "show running-config",
    "ericsson_ipos": "show configuration",
    "hp_comware": "display current-configuration",
    "huawei": "display current-configuration",
    "fortinet": "show full-configuration",
    "checkpoint": "show configuration",
    "cisco_wlc": "show run-config",
    "enterasys": "show running-config",
    "dell_force10": "show running-config",
    "avaya_vsp": "show running-config",
    "avaya_ers": "show running-config",
    "brocade_vdx": "show running-config",
    "brocade_nos": "show running-config",
    "brocade_fastiron": "show running-config",
    "brocade_netiron": "show running-config",
    "alcatel_aos": "show configuration snapshot",
    "cros_mtbr": "show running-config",
}

# Expand SHOW_RUN_MAPPER to include '_ssh' key
new_dict = {}
for k, v in SHOW_RUN_MAPPER.items():
    new_key = k + "_ssh"
    new_dict[k] = v
    new_dict[new_key] = v
SHOW_RUN_MAPPER = new_dict

# Default location of netmiko temp directory for netmiko tools
NETMIKO_BASE_DIR = "~/.netmiko"


def load_yaml_file(yaml_file):
    """Read YAML file."""
    try:
        import yaml
    except ImportError:
        sys.exit("Unable to import yaml module.")
    try:
        with io.open(yaml_file, "rt", encoding="utf-8") as fname:
            return yaml.safe_load(fname)
    except IOError:
        sys.exit(f"Unable to open YAML file: {yaml_file}")


def load_devices(file_name=None):
    """Find and load .netmiko.yml file."""
    yaml_devices_file = find_cfg_file(file_name)
    return load_yaml_file(yaml_devices_file)


def find_cfg_file(file_name=None):
    """
    Search for netmiko_tools inventory file in the following order:
    NETMIKO_TOOLS_CFG environment variable
    Current directory
    Home directory
    Look for file named: .netmiko.yml or netmiko.yml
    Also allow NETMIKO_TOOLS_CFG to point directly at a file
    """
    if file_name:
        if os.path.isfile(file_name):
            return file_name
    optional_path = os.environ.get("NETMIKO_TOOLS_CFG", "")
    if os.path.isfile(optional_path):
        return optional_path
    search_paths = [optional_path, ".", os.path.expanduser("~")]
    # Filter optional_path if null
    search_paths = [path for path in search_paths if path]
    for path in search_paths:
        files = glob(f"{path}/.netmiko.yml") + glob(f"{path}/netmiko.yml")
        if files:
            return files[0]
    raise IOError(
        ".netmiko.yml file not found in NETMIKO_TOOLS environment variable directory,"
        " current directory, or home directory."
    )


def display_inventory(my_devices):
    """Print out inventory devices and groups."""
    inventory_groups = ["all"]
    inventory_devices = []
    for k, v in my_devices.items():
        if isinstance(v, list):
            inventory_groups.append(k)
        elif isinstance(v, dict):
            inventory_devices.append((k, v["device_type"]))

    inventory_groups.sort()
    inventory_devices.sort(key=lambda x: x[0])
    print("\nDevices:")
    print("-" * 40)
    for a_device, device_type in inventory_devices:
        device_type = f"  ({device_type})"
        print(f"{a_device:<25}{device_type:>15}")
    print("\n\nGroups:")
    print("-" * 40)
    for a_group in inventory_groups:
        print(a_group)
    print()


def obtain_all_devices(my_devices):
    """Dynamically create 'all' group."""
    new_devices = {}
    for device_name, device_or_group in my_devices.items():
        # Skip any groups
        if not isinstance(device_or_group, list):
            new_devices[device_name] = device_or_group
    return new_devices


def obtain_netmiko_filename(device_name):
    """Create file name based on device_name."""
    _, netmiko_full_dir = find_netmiko_dir()
    return f"{netmiko_full_dir}/{device_name}.txt"


def write_tmp_file(device_name, output):
    file_name = obtain_netmiko_filename(device_name)
    with open(file_name, "w") as f:
        f.write(output)
    return file_name


def ensure_dir_exists(verify_dir):
    """Ensure directory exists. Create if necessary."""
    if not os.path.exists(verify_dir):
        # Doesn't exist create dir
        os.makedirs(verify_dir)
    else:
        # Exists
        if not os.path.isdir(verify_dir):
            # Not a dir, raise an exception
            raise ValueError(f"{verify_dir} is not a directory")


def find_netmiko_dir():
    """Check environment first, then default dir"""
    try:
        netmiko_base_dir = os.environ["NETMIKO_DIR"]
    except KeyError:
        netmiko_base_dir = NETMIKO_BASE_DIR
    netmiko_base_dir = os.path.expanduser(netmiko_base_dir)
    if netmiko_base_dir == "/":
        raise ValueError("/ cannot be netmiko_base_dir")
    netmiko_full_dir = f"{netmiko_base_dir}/tmp"
    return (netmiko_base_dir, netmiko_full_dir)


def write_bytes(out_data, encoding="ascii"):
    """Legacy for Python2 and Python3 compatible byte stream."""
    if sys.version_info[0] >= 3:
        if isinstance(out_data, type("")):
            if encoding == "utf-8":
                return out_data.encode("utf-8")
            else:
                return out_data.encode("ascii", "ignore")
        elif isinstance(out_data, type(b"")):
            return out_data
    msg = "Invalid value for out_data neither unicode nor byte string: {}".format(
        out_data
    )
    raise ValueError(msg)


def check_serial_port(name):
    """returns valid COM Port."""

    if not PYSERIAL_INSTALLED:
        msg = (
            "\npyserial is not installed. Please PIP install pyserial:\n\n"
            "pip install pyserial\n\n"
        )
        raise ValueError(msg)

    try:
        cdc = next(serial.tools.list_ports.grep(name))
        return cdc[0]
    except StopIteration:
        msg = f"device {name} not found. "
        msg += "available devices are: "
        ports = list(serial.tools.list_ports.comports())
        for p in ports:
            msg += f"{str(p)},"
        raise ValueError(msg)


def get_template_dir(_skip_ntc_package=False):
    """
    Find and return the directory containing the TextFSM index file.

    Order of preference is:
    1) Find directory in `NET_TEXTFSM` Environment Variable.
    2) Check for pip installed `ntc-templates` location in this environment.
    3) ~/ntc-templates/templates.

    If `index` file is not found in any of these locations, raise ValueError

    :return: directory containing the TextFSM index file

    """

    msg = """
Directory containing TextFSM index file not found.

Please set the NET_TEXTFSM environment variable to point at the directory containing your TextFSM
index file.

Alternatively, `pip install ntc-templates` (if using ntc-templates).

"""

    # Try NET_TEXTFSM environment variable
    template_dir = os.environ.get("NET_TEXTFSM")
    if template_dir is not None:
        template_dir = os.path.expanduser(template_dir)
        index = os.path.join(template_dir, "index")
        if not os.path.isfile(index):
            # Assume only base ./ntc-templates specified
            template_dir = os.path.join(template_dir, "templates")

    else:
        # Try 'pip installed' ntc-templates
        try:
            with importresources_path(
                package="ntc_templates", resource="templates"
            ) as posix_path:
                # Example: /opt/venv/netmiko/lib/python3.8/site-packages/ntc_templates/templates
                template_dir = str(posix_path)
                # This is for Netmiko automated testing
                if _skip_ntc_package:
                    raise ModuleNotFoundError()

        except ModuleNotFoundError:
            # Finally check in ~/ntc-templates/templates
            home_dir = os.path.expanduser("~")
            template_dir = os.path.join(home_dir, "ntc-templates", "templates")

    index = os.path.join(template_dir, "index")
    if not os.path.isdir(template_dir) or not os.path.isfile(index):
        raise ValueError(msg)
    return os.path.abspath(template_dir)


def clitable_to_dict(cli_table):
    """Converts TextFSM cli_table object to list of dictionaries."""
    objs = []
    for row in cli_table:
        temp_dict = {}
        for index, element in enumerate(row):
            temp_dict[cli_table.header[index].lower()] = element
        objs.append(temp_dict)
    return objs


def _textfsm_parse(textfsm_obj, raw_output, attrs, template_file=None):
    """Perform the actual TextFSM parsing using the CliTable object."""
    try:
        # Parse output through template
        if template_file is not None:
            textfsm_obj.ParseCmd(raw_output, templates=template_file)
        else:
            textfsm_obj.ParseCmd(raw_output, attrs)
        structured_data = clitable_to_dict(textfsm_obj)
        output = raw_output if structured_data == [] else structured_data
        return output
    except (FileNotFoundError, CliTableError):
        return raw_output


def get_structured_data(raw_output, platform=None, command=None, template=None):
    """
    Convert raw CLI output to structured data using TextFSM template.

    You can use a straight TextFSM file i.e. specify "template". If no template is specified,
    then you must use an CliTable index file.
    """
    if platform is None or command is None:
        attrs = {}
    else:
        attrs = {"Command": command, "Platform": platform}

    if template is None:
        if attrs == {}:
            raise ValueError(
                "Either 'platform/command' or 'template' must be specified."
            )
        template_dir = get_template_dir()
        index_file = os.path.join(template_dir, "index")
        textfsm_obj = clitable.CliTable(index_file, template_dir)
        return _textfsm_parse(textfsm_obj, raw_output, attrs)
    else:
        template_path = Path(os.path.expanduser(template))
        template_file = template_path.name
        template_dir = template_path.parents[0]
        # CliTable with no index will fall-back to a TextFSM parsing behavior
        textfsm_obj = clitable.CliTable(template_dir=template_dir)
        return _textfsm_parse(
            textfsm_obj, raw_output, attrs, template_file=template_file
        )


def get_structured_data_ttp(raw_output, template=None):
    """
    Convert raw CLI output to structured data using TTP template.

    You can use a straight TextFSM file i.e. specify "template"
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n" "pip install ttp\n"
        raise ValueError(msg)

    try:
        if template:
            ttp_parser = ttp(data=raw_output, template=template)
            ttp_parser.parse(one=True)
            return ttp_parser.result(format="raw")
    except Exception:
        return raw_output


def run_ttp_template(connection, template, res_kwargs, **kwargs):
    """
    Helper function to run TTP template parsing.

    :param connection: Netmiko connection object
    :type connection: obj

    :param template: TTP template
    :type template: str

    :param res_kwargs: ``**res_kwargs`` arguments for TTP result method
    :type res_kwargs: dict

    :param kwargs: ``**kwargs`` for TTP object instantiation
    :type kwargs: dict
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n" "pip install ttp\n"
        raise ValueError(msg)

    parser = ttp(template=template, **kwargs)

    # get inputs load for TTP template
    ttp_inputs_load = parser.get_input_load()
    log.debug("run_ttp_template: inputs load - {}".format(ttp_inputs_load))

    # go over template's inputs and collect output from devices
    for template_name, inputs in ttp_inputs_load.items():
        for input_name, input_params in inputs.items():
            method = input_params.get("method", "send_command")
            method_kwargs = input_params.get("kwargs", {})
            commands = input_params.get("commands", None)

            # run sanity checks
            if method not in dir(connection):
                log.warning(
                    "run_ttp_template: '{}' input, unsupported method '{}', skipping".format(
                        input_name, method
                    )
                )
                continue
            elif not commands:
                log.warning(
                    "run_ttp_template: '{}' input no commands to collect, skipping".format(
                        input_name
                    )
                )
                continue

            # collect commands output from device
            output = [
                getattr(connection, method)(command_string=command, **method_kwargs)
                for command in commands
            ]
            output = "\n".join(output)

            # add collected output to TTP parser object
            parser.add_input(
                data=output, input_name=input_name, template_name=template_name
            )

    # run parsing in single process
    parser.parse(one=True)

    return parser.result(**res_kwargs)


def get_structured_data_genie(raw_output, platform, command):
    if not sys.version_info >= (3, 4):
        raise ValueError("Genie requires Python >= 3.4")

    if not GENIE_INSTALLED:
        msg = (
            "\nGenie and PyATS are not installed. Please PIP install both Genie and PyATS:\n"
            "pip install genie\npip install pyats\n"
        )
        raise ValueError(msg)

    if "cisco" not in platform:
        return raw_output

    genie_device_mapper = {
        "cisco_ios": "ios",
        "cisco_xe": "iosxe",
        "cisco_xr": "iosxr",
        "cisco_nxos": "nxos",
        "cisco_asa": "asa",
    }

    os = None
    # platform might be _ssh, _telnet, _serial strip that off
    if platform.count("_") > 1:
        base_platform = platform.split("_")[:-1]
        base_platform = "_".join(base_platform)
    else:
        base_platform = platform

    os = genie_device_mapper.get(base_platform)
    if os is None:
        return raw_output

    # Genie specific construct for doing parsing (based on Genie in Ansible)
    device = Device("new_device", os=os)
    device.custom.setdefault("abstraction", {})
    device.custom["abstraction"]["order"] = ["os"]
    device.cli = AttrDict({"execute": None})
    try:
        # Test whether there is a parser for given command (return Exception if fails)
        get_parser(command, device)
        parsed_output = device.parse(command, output=raw_output)
        return parsed_output
    except Exception:
        return raw_output


def select_cmd_verify(func):
    """Override function cmd_verify argument with global setting."""

    @functools.wraps(func)
    def wrapper_decorator(self, *args, **kwargs):
        if self.global_cmd_verify is not None:
            kwargs["cmd_verify"] = self.global_cmd_verify
        return func(self, *args, **kwargs)

    return wrapper_decorator


def m_exec_time(func):
    @functools.wraps(func)
    def wrapper_decorator(self, *args, **kwargs):
        start_time = datetime.now()
        result = func(self, *args, **kwargs)
        end_time = datetime.now()
        method_name = str(func)
        print(f"{method_name}: Elapsed time: {end_time - start_time}")
        return result

    return wrapper_decorator


def f_exec_time(func):
    @functools.wraps(func)
    def wrapper_decorator(*args, **kwargs):
        start_time = datetime.now()
        result = func(*args, **kwargs)
        end_time = datetime.now()
        print(f"Elapsed time: {end_time - start_time}")
        return result

    return wrapper_decorator

Functions

def check_serial_port(name)

returns valid COM Port.

Source code
def check_serial_port(name):
    """returns valid COM Port."""

    if not PYSERIAL_INSTALLED:
        msg = (
            "\npyserial is not installed. Please PIP install pyserial:\n\n"
            "pip install pyserial\n\n"
        )
        raise ValueError(msg)

    try:
        cdc = next(serial.tools.list_ports.grep(name))
        return cdc[0]
    except StopIteration:
        msg = f"device {name} not found. "
        msg += "available devices are: "
        ports = list(serial.tools.list_ports.comports())
        for p in ports:
            msg += f"{str(p)},"
        raise ValueError(msg)
def clitable_to_dict(cli_table)

Converts TextFSM cli_table object to list of dictionaries.

Source code
def clitable_to_dict(cli_table):
    """Converts TextFSM cli_table object to list of dictionaries."""
    objs = []
    for row in cli_table:
        temp_dict = {}
        for index, element in enumerate(row):
            temp_dict[cli_table.header[index].lower()] = element
        objs.append(temp_dict)
    return objs
def display_inventory(my_devices)

Print out inventory devices and groups.

Source code
def display_inventory(my_devices):
    """Print out inventory devices and groups."""
    inventory_groups = ["all"]
    inventory_devices = []
    for k, v in my_devices.items():
        if isinstance(v, list):
            inventory_groups.append(k)
        elif isinstance(v, dict):
            inventory_devices.append((k, v["device_type"]))

    inventory_groups.sort()
    inventory_devices.sort(key=lambda x: x[0])
    print("\nDevices:")
    print("-" * 40)
    for a_device, device_type in inventory_devices:
        device_type = f"  ({device_type})"
        print(f"{a_device:<25}{device_type:>15}")
    print("\n\nGroups:")
    print("-" * 40)
    for a_group in inventory_groups:
        print(a_group)
    print()
def ensure_dir_exists(verify_dir)

Ensure directory exists. Create if necessary.

Source code
def ensure_dir_exists(verify_dir):
    """Ensure directory exists. Create if necessary."""
    if not os.path.exists(verify_dir):
        # Doesn't exist create dir
        os.makedirs(verify_dir)
    else:
        # Exists
        if not os.path.isdir(verify_dir):
            # Not a dir, raise an exception
            raise ValueError(f"{verify_dir} is not a directory")
def f_exec_time(func)
Source code
def f_exec_time(func):
    @functools.wraps(func)
    def wrapper_decorator(*args, **kwargs):
        start_time = datetime.now()
        result = func(*args, **kwargs)
        end_time = datetime.now()
        print(f"Elapsed time: {end_time - start_time}")
        return result

    return wrapper_decorator
def find_cfg_file(file_name=None)

Search for netmiko_tools inventory file in the following order: NETMIKO_TOOLS_CFG environment variable Current directory Home directory Look for file named: .netmiko.yml or netmiko.yml Also allow NETMIKO_TOOLS_CFG to point directly at a file

Source code
def find_cfg_file(file_name=None):
    """
    Search for netmiko_tools inventory file in the following order:
    NETMIKO_TOOLS_CFG environment variable
    Current directory
    Home directory
    Look for file named: .netmiko.yml or netmiko.yml
    Also allow NETMIKO_TOOLS_CFG to point directly at a file
    """
    if file_name:
        if os.path.isfile(file_name):
            return file_name
    optional_path = os.environ.get("NETMIKO_TOOLS_CFG", "")
    if os.path.isfile(optional_path):
        return optional_path
    search_paths = [optional_path, ".", os.path.expanduser("~")]
    # Filter optional_path if null
    search_paths = [path for path in search_paths if path]
    for path in search_paths:
        files = glob(f"{path}/.netmiko.yml") + glob(f"{path}/netmiko.yml")
        if files:
            return files[0]
    raise IOError(
        ".netmiko.yml file not found in NETMIKO_TOOLS environment variable directory,"
        " current directory, or home directory."
    )
def find_netmiko_dir()

Check environment first, then default dir

Source code
def find_netmiko_dir():
    """Check environment first, then default dir"""
    try:
        netmiko_base_dir = os.environ["NETMIKO_DIR"]
    except KeyError:
        netmiko_base_dir = NETMIKO_BASE_DIR
    netmiko_base_dir = os.path.expanduser(netmiko_base_dir)
    if netmiko_base_dir == "/":
        raise ValueError("/ cannot be netmiko_base_dir")
    netmiko_full_dir = f"{netmiko_base_dir}/tmp"
    return (netmiko_base_dir, netmiko_full_dir)
def get_structured_data(raw_output, platform=None, command=None, template=None)

Convert raw CLI output to structured data using TextFSM template.

You can use a straight TextFSM file i.e. specify "template". If no template is specified, then you must use an CliTable index file.

Source code
def get_structured_data(raw_output, platform=None, command=None, template=None):
    """
    Convert raw CLI output to structured data using TextFSM template.

    You can use a straight TextFSM file i.e. specify "template". If no template is specified,
    then you must use an CliTable index file.
    """
    if platform is None or command is None:
        attrs = {}
    else:
        attrs = {"Command": command, "Platform": platform}

    if template is None:
        if attrs == {}:
            raise ValueError(
                "Either 'platform/command' or 'template' must be specified."
            )
        template_dir = get_template_dir()
        index_file = os.path.join(template_dir, "index")
        textfsm_obj = clitable.CliTable(index_file, template_dir)
        return _textfsm_parse(textfsm_obj, raw_output, attrs)
    else:
        template_path = Path(os.path.expanduser(template))
        template_file = template_path.name
        template_dir = template_path.parents[0]
        # CliTable with no index will fall-back to a TextFSM parsing behavior
        textfsm_obj = clitable.CliTable(template_dir=template_dir)
        return _textfsm_parse(
            textfsm_obj, raw_output, attrs, template_file=template_file
        )
def get_structured_data_genie(raw_output, platform, command)
Source code
def get_structured_data_genie(raw_output, platform, command):
    if not sys.version_info >= (3, 4):
        raise ValueError("Genie requires Python >= 3.4")

    if not GENIE_INSTALLED:
        msg = (
            "\nGenie and PyATS are not installed. Please PIP install both Genie and PyATS:\n"
            "pip install genie\npip install pyats\n"
        )
        raise ValueError(msg)

    if "cisco" not in platform:
        return raw_output

    genie_device_mapper = {
        "cisco_ios": "ios",
        "cisco_xe": "iosxe",
        "cisco_xr": "iosxr",
        "cisco_nxos": "nxos",
        "cisco_asa": "asa",
    }

    os = None
    # platform might be _ssh, _telnet, _serial strip that off
    if platform.count("_") > 1:
        base_platform = platform.split("_")[:-1]
        base_platform = "_".join(base_platform)
    else:
        base_platform = platform

    os = genie_device_mapper.get(base_platform)
    if os is None:
        return raw_output

    # Genie specific construct for doing parsing (based on Genie in Ansible)
    device = Device("new_device", os=os)
    device.custom.setdefault("abstraction", {})
    device.custom["abstraction"]["order"] = ["os"]
    device.cli = AttrDict({"execute": None})
    try:
        # Test whether there is a parser for given command (return Exception if fails)
        get_parser(command, device)
        parsed_output = device.parse(command, output=raw_output)
        return parsed_output
    except Exception:
        return raw_output
def get_structured_data_ttp(raw_output, template=None)

Convert raw CLI output to structured data using TTP template.

You can use a straight TextFSM file i.e. specify "template"

Source code
def get_structured_data_ttp(raw_output, template=None):
    """
    Convert raw CLI output to structured data using TTP template.

    You can use a straight TextFSM file i.e. specify "template"
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n" "pip install ttp\n"
        raise ValueError(msg)

    try:
        if template:
            ttp_parser = ttp(data=raw_output, template=template)
            ttp_parser.parse(one=True)
            return ttp_parser.result(format="raw")
    except Exception:
        return raw_output
def get_template_dir()

Find and return the directory containing the TextFSM index file.

Order of preference is: 1) Find directory in NET_TEXTFSM Environment Variable. 2) Check for pip installed ntc-templates location in this environment. 3) ~/ntc-templates/templates.

If index file is not found in any of these locations, raise ValueError

:return: directory containing the TextFSM index file

Source code
def get_template_dir(_skip_ntc_package=False):
    """
    Find and return the directory containing the TextFSM index file.

    Order of preference is:
    1) Find directory in `NET_TEXTFSM` Environment Variable.
    2) Check for pip installed `ntc-templates` location in this environment.
    3) ~/ntc-templates/templates.

    If `index` file is not found in any of these locations, raise ValueError

    :return: directory containing the TextFSM index file

    """

    msg = """
Directory containing TextFSM index file not found.

Please set the NET_TEXTFSM environment variable to point at the directory containing your TextFSM
index file.

Alternatively, `pip install ntc-templates` (if using ntc-templates).

"""

    # Try NET_TEXTFSM environment variable
    template_dir = os.environ.get("NET_TEXTFSM")
    if template_dir is not None:
        template_dir = os.path.expanduser(template_dir)
        index = os.path.join(template_dir, "index")
        if not os.path.isfile(index):
            # Assume only base ./ntc-templates specified
            template_dir = os.path.join(template_dir, "templates")

    else:
        # Try 'pip installed' ntc-templates
        try:
            with importresources_path(
                package="ntc_templates", resource="templates"
            ) as posix_path:
                # Example: /opt/venv/netmiko/lib/python3.8/site-packages/ntc_templates/templates
                template_dir = str(posix_path)
                # This is for Netmiko automated testing
                if _skip_ntc_package:
                    raise ModuleNotFoundError()

        except ModuleNotFoundError:
            # Finally check in ~/ntc-templates/templates
            home_dir = os.path.expanduser("~")
            template_dir = os.path.join(home_dir, "ntc-templates", "templates")

    index = os.path.join(template_dir, "index")
    if not os.path.isdir(template_dir) or not os.path.isfile(index):
        raise ValueError(msg)
    return os.path.abspath(template_dir)
def load_devices(file_name=None)

Find and load .netmiko.yml file.

Source code
def load_devices(file_name=None):
    """Find and load .netmiko.yml file."""
    yaml_devices_file = find_cfg_file(file_name)
    return load_yaml_file(yaml_devices_file)
def load_yaml_file(yaml_file)

Read YAML file.

Source code
def load_yaml_file(yaml_file):
    """Read YAML file."""
    try:
        import yaml
    except ImportError:
        sys.exit("Unable to import yaml module.")
    try:
        with io.open(yaml_file, "rt", encoding="utf-8") as fname:
            return yaml.safe_load(fname)
    except IOError:
        sys.exit(f"Unable to open YAML file: {yaml_file}")
def m_exec_time(func)
Source code
def m_exec_time(func):
    @functools.wraps(func)
    def wrapper_decorator(self, *args, **kwargs):
        start_time = datetime.now()
        result = func(self, *args, **kwargs)
        end_time = datetime.now()
        method_name = str(func)
        print(f"{method_name}: Elapsed time: {end_time - start_time}")
        return result

    return wrapper_decorator
def obtain_all_devices(my_devices)

Dynamically create 'all' group.

Source code
def obtain_all_devices(my_devices):
    """Dynamically create 'all' group."""
    new_devices = {}
    for device_name, device_or_group in my_devices.items():
        # Skip any groups
        if not isinstance(device_or_group, list):
            new_devices[device_name] = device_or_group
    return new_devices
def obtain_netmiko_filename(device_name)

Create file name based on device_name.

Source code
def obtain_netmiko_filename(device_name):
    """Create file name based on device_name."""
    _, netmiko_full_dir = find_netmiko_dir()
    return f"{netmiko_full_dir}/{device_name}.txt"
def run_ttp_template(connection, template, res_kwargs, **kwargs)

Helper function to run TTP template parsing.

:param connection: Netmiko connection object :type connection: obj

:param template: TTP template :type template: str

:param res_kwargs: **res_kwargs arguments for TTP result method :type res_kwargs: dict

:param kwargs: **kwargs for TTP object instantiation :type kwargs: dict

Source code
def run_ttp_template(connection, template, res_kwargs, **kwargs):
    """
    Helper function to run TTP template parsing.

    :param connection: Netmiko connection object
    :type connection: obj

    :param template: TTP template
    :type template: str

    :param res_kwargs: ``**res_kwargs`` arguments for TTP result method
    :type res_kwargs: dict

    :param kwargs: ``**kwargs`` for TTP object instantiation
    :type kwargs: dict
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n" "pip install ttp\n"
        raise ValueError(msg)

    parser = ttp(template=template, **kwargs)

    # get inputs load for TTP template
    ttp_inputs_load = parser.get_input_load()
    log.debug("run_ttp_template: inputs load - {}".format(ttp_inputs_load))

    # go over template's inputs and collect output from devices
    for template_name, inputs in ttp_inputs_load.items():
        for input_name, input_params in inputs.items():
            method = input_params.get("method", "send_command")
            method_kwargs = input_params.get("kwargs", {})
            commands = input_params.get("commands", None)

            # run sanity checks
            if method not in dir(connection):
                log.warning(
                    "run_ttp_template: '{}' input, unsupported method '{}', skipping".format(
                        input_name, method
                    )
                )
                continue
            elif not commands:
                log.warning(
                    "run_ttp_template: '{}' input no commands to collect, skipping".format(
                        input_name
                    )
                )
                continue

            # collect commands output from device
            output = [
                getattr(connection, method)(command_string=command, **method_kwargs)
                for command in commands
            ]
            output = "\n".join(output)

            # add collected output to TTP parser object
            parser.add_input(
                data=output, input_name=input_name, template_name=template_name
            )

    # run parsing in single process
    parser.parse(one=True)

    return parser.result(**res_kwargs)
def select_cmd_verify(func)

Override function cmd_verify argument with global setting.

Source code
def select_cmd_verify(func):
    """Override function cmd_verify argument with global setting."""

    @functools.wraps(func)
    def wrapper_decorator(self, *args, **kwargs):
        if self.global_cmd_verify is not None:
            kwargs["cmd_verify"] = self.global_cmd_verify
        return func(self, *args, **kwargs)

    return wrapper_decorator
def write_bytes(out_data, encoding='ascii')

Legacy for Python2 and Python3 compatible byte stream.

Source code
def write_bytes(out_data, encoding="ascii"):
    """Legacy for Python2 and Python3 compatible byte stream."""
    if sys.version_info[0] >= 3:
        if isinstance(out_data, type("")):
            if encoding == "utf-8":
                return out_data.encode("utf-8")
            else:
                return out_data.encode("ascii", "ignore")
        elif isinstance(out_data, type(b"")):
            return out_data
    msg = "Invalid value for out_data neither unicode nor byte string: {}".format(
        out_data
    )
    raise ValueError(msg)
def write_tmp_file(device_name, output)
Source code
def write_tmp_file(device_name, output):
    file_name = obtain_netmiko_filename(device_name)
    with open(file_name, "w") as f:
        f.write(output)
    return file_name