Salta el contingut

Built-in Plugins API

NOS Plugins#

NOS plugins are at the heart of FakeNOS, they are what enables to realize its full potential.

Cisco IOS#

NOS module for Cisco IOS

CiscoIOS #

Bases: BaseDevice

Class that keeps track of the state of the Cisco IOS device.

Source code in fakenos/plugins/nos/platforms_py/cisco_ios.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class CiscoIOS(BaseDevice):
    """
    Class that keeps track of the state of the Cisco IOS device.
    """

    hola = "Hello test"

    def make_show_clock(self, base_prompt, current_prompt, command):
        "Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'"
        return time.strftime("*%H:%M:%S.000 %Z %a %b %d %Y")

    def make_show_running_config(self, base_prompt, current_prompt, command):
        "Return String of running configuration"
        return self.render("cisco_ios/show_running-config.j2", base_prompt=base_prompt)

    def make_show_version(self, base_prompt, current_prompt, command):
        "Return String of system hardware and software status"
        return self.render("cisco_ios/show_version.j2", base_prompt=base_prompt)

make_show_clock(base_prompt, current_prompt, command) #

Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'

Source code in fakenos/plugins/nos/platforms_py/cisco_ios.py
26
27
28
def make_show_clock(self, base_prompt, current_prompt, command):
    "Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'"
    return time.strftime("*%H:%M:%S.000 %Z %a %b %d %Y")

make_show_running_config(base_prompt, current_prompt, command) #

Return String of running configuration

Source code in fakenos/plugins/nos/platforms_py/cisco_ios.py
30
31
32
def make_show_running_config(self, base_prompt, current_prompt, command):
    "Return String of running configuration"
    return self.render("cisco_ios/show_running-config.j2", base_prompt=base_prompt)

make_show_version(base_prompt, current_prompt, command) #

Return String of system hardware and software status

Source code in fakenos/plugins/nos/platforms_py/cisco_ios.py
34
35
36
def make_show_version(self, base_prompt, current_prompt, command):
    "Return String of system hardware and software status"
    return self.render("cisco_ios/show_version.j2", base_prompt=base_prompt)

Servers Plugins#

Servers Plugins acts as an access layer, simulating device connections.

ParamikoSshServer#

Bases: TCPServerBase

Class to implement an SSH server using paramiko as the SSH connection library.

Source code in fakenos/plugins/servers/ssh_server_paramiko.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class ParamikoSshServer(TCPServerBase):
    """
    Class to implement an SSH server using paramiko
    as the SSH connection library.
    """

    # pylint: disable=too-many-instance-attributes
    # pylint: disable=too-many-arguments
    def __init__(
        self,
        shell: type,
        nos: Nos,
        nos_inventory_config: Dict,
        port: int,
        username: str,
        password: str,
        ssh_key_file: paramiko.rsakey.RSAKey = None,
        ssh_key_file_password: str = None,
        ssh_banner: str = "FakeNOS Paramiko SSH Server",
        shell_configuration: Dict = None,
        address: str = "127.0.0.1",
        timeout: int = 1,
        watchdog_interval: int = 1,
    ):
        super().__init__()

        self.nos: Nos = nos
        self.nos_inventory_config: Dict = nos_inventory_config
        self.shell: type = shell
        self.shell_configuration: Dict = shell_configuration or {}
        self.ssh_banner: str = ssh_banner
        self.username: str = username
        self.password: str = password
        self.port: int = port
        self.address: str = address
        self.timeout: int = timeout
        self.watchdog_interval: int = watchdog_interval

        if ssh_key_file:
            self._ssh_server_key: paramiko.rsakey.RSAKey = paramiko.RSAKey.from_private_key_file(
                ssh_key_file, ssh_key_file_password
            )
        else:
            self._ssh_server_key: paramiko.rsakey.RSAKey = paramiko.RSAKey(file_obj=io.StringIO(DEFAULT_SSH_KEY))

    def watchdog(self, is_running: threading.Event, run_srv: threading.Event, session: paramiko.Transport, shell: any):
        """
        Method to monitor server liveness and recover where possible.
        """
        while run_srv.is_set():
            if not session.is_alive():
                log.warning(
                    "ParamikoSshServer.watchdog - \
                        session not alive, stopping shell"
                )
                shell.stop()
                break

            if not is_running.is_set():
                shell.stop()

            time.sleep(self.watchdog_interval)

    def connection_function(self, client: socket.socket, is_running: threading.Event):
        shell_replied_event = threading.Event()
        run_srv = threading.Event()
        run_srv.set()

        # create the SSH transport object
        session = paramiko.Transport(client)
        session.add_server_key(self._ssh_server_key)

        # create the server
        server = ParamikoSshServerInterface(
            ssh_banner=self.ssh_banner,
            username=self.username,
            password=self.password,
        )

        # start the SSH server
        session.start_server(server=server)

        # create the channel and get the stdio
        channel = session.accept()
        channel_stdio = channel.makefile("rw")

        # create stdio for the shell
        shell_stdin, shell_stdout = TapIO(run_srv), TapIO(run_srv)

        # start intermediate thread to tap into
        # the channel_stdio->shell_stdin bytes stream
        channel_to_shell_tapper = threading.Thread(
            target=channel_to_shell_tap,
            args=(channel_stdio, shell_stdin, shell_replied_event, run_srv),
        )
        channel_to_shell_tapper.start()

        # start intermediate thread to tap into
        # the shell_stdout->channel_stdio bytes stream
        shell_to_channel_tapper = threading.Thread(
            target=shell_to_channel_tap,
            args=(channel_stdio, shell_stdout, shell_replied_event, run_srv),
        )
        shell_to_channel_tapper.start()

        # create the client shell
        client_shell = self.shell(
            stdin=shell_stdin,
            stdout=shell_stdout,
            nos=self.nos,
            nos_inventory_config=self.nos_inventory_config,
            is_running=is_running,
            **self.shell_configuration,
        )

        # start watchdog thread
        watchdog_thread = threading.Thread(target=self.watchdog, args=(is_running, run_srv, session, client_shell))
        watchdog_thread.start()

        # running this command will block this function until shell exits
        client_shell.start()
        log.debug("ParamikoSshServer.connection_function stopped shell thread")

        # kill this server threads - watchdog, TapIO,
        # shell_to_channel_tapper and channel_to_shell_tapper
        run_srv.clear()
        log.debug("ParamikoSshServer.connection_function stopped server threads")

        # After execution continues, we can close the session
        session.close()
        log.debug("ParamikoSshServer.connection_function closed transport %s", session)

watchdog(is_running, run_srv, session, shell) #

Method to monitor server liveness and recover where possible.

Source code in fakenos/plugins/servers/ssh_server_paramiko.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def watchdog(self, is_running: threading.Event, run_srv: threading.Event, session: paramiko.Transport, shell: any):
    """
    Method to monitor server liveness and recover where possible.
    """
    while run_srv.is_set():
        if not session.is_alive():
            log.warning(
                "ParamikoSshServer.watchdog - \
                    session not alive, stopping shell"
            )
            shell.stop()
            break

        if not is_running.is_set():
            shell.stop()

        time.sleep(self.watchdog_interval)

Shell Plugins#

Shell Plugins act as a plumbing between servers plugins and NOS plugins, gluing, connecting them together.

CMDShell#

Bases: Cmd

Custom shell class to interact with NOS.

Source code in fakenos/plugins/shell/cmd_shell.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class CMDShell(Cmd):
    """
    Custom shell class to interact with NOS.
    """

    use_rawinput = False

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        stdin,
        stdout,
        nos,
        nos_inventory_config,
        base_prompt,
        is_running,
        intro="Custom SSH Shell",
        ruler="",
        completekey="tab",
        newline="\r\n",
    ):
        self.nos: Nos = nos
        self.ruler = ruler
        self.intro = intro
        self.base_prompt = base_prompt
        self.newline = newline
        self.prompt = nos.initial_prompt.format(base_prompt=base_prompt)
        self.is_running = is_running

        # form commands
        self.commands = {
            **copy.deepcopy(BASIC_COMMANDS),
            **copy.deepcopy(nos.commands or {}),
            **copy.deepcopy(nos_inventory_config.get("commands", {})),
        }
        # call the base constructor of cmd.Cmd, with our own stdin and stdout
        super().__init__(
            completekey=completekey,
            stdin=stdin,
            stdout=stdout,
        )

    def start(self):
        """Method to start the shell"""
        self.cmdloop()

    def stop(self):
        """Method to stop the shell"""
        self.stdin.write("exit" + self.newline)

    def writeline(self, value):
        """Method to write a line to stdout with newline at the end"""
        for line in str(value).splitlines():
            self.stdout.write(line + self.newline)

    def emptyline(self):
        """This method to do nothing if empty line entered"""

    def reload_commands(self, changed_files: list):
        """Method to reload commands"""
        for file in changed_files:
            self.nos.from_file(file)
            self.commands.update(self.nos.commands)

    def precmd(self, line):
        """Method to return line before processing the command"""
        if os.environ.get("FAKENOS_RELOAD_COMMANDS"):
            changed_files = get_files_changed(nos.__path__[0])
            if changed_files:
                log.debug("Reloading... Files changed: %s", changed_files)
                self.reload_commands(changed_files)
        return line

    # pylint: disable=unused-argument
    def postcmd(self, stop, line):
        """Method to return stop value to stop the shell"""
        return stop

    # pylint: disable=unused-argument
    def do_help(self, arg):
        """Method to return help for commands"""
        lines = {}  # dict of {cmd: cmd_help}
        width = 0  # record longest command width for padding
        # form help for all commands
        for cmd, cmd_data in self.commands.items():
            # skip special commands
            if cmd.startswith("_") and cmd.endswith("_"):
                continue
            # skip commands that does not match current prompt
            if not self._check_prompt(cmd_data.get("prompt")):
                continue
            lines[cmd] = cmd_data.get("help", "")
            width = max(width, len(cmd))
        # form help lines
        help_msg = []
        for k, v in lines.items():
            padding = " " * (width - len(k)) + "  "
            help_msg.append(f"{k}{padding}{v}")
        self.writeline(self.newline.join(help_msg))

    def _check_prompt(self, prompt_: Union[str, List[str]]):
        """
        Helper method to check if prompt_ matches current prompt

        :param prompt_: (string or None)  prompt to check
        """
        # prompt_ is None if no 'prompt' key defined for command
        if prompt_ is None:
            return True
        if isinstance(prompt_, str):
            return self.prompt == prompt_.format(base_prompt=self.base_prompt)
        return any(self.prompt == i.format(base_prompt=self.base_prompt) for i in prompt_)

    # pylint: disable=too-many-branches
    def default(self, line):
        """Method called if no do_xyz methods found"""
        log.debug("shell.default '%s' running command '%s'", self.base_prompt, [line])
        ret = self.commands["_default_"]["output"]
        try:
            cmd_data = self.commands[line]
            if "alias" in cmd_data:
                cmd_data = {**self.commands[cmd_data.pop("alias")], **cmd_data}
            if self._check_prompt(cmd_data.get("prompt")):
                ret = cmd_data["output"]
                if callable(ret):
                    ret = ret(
                        self.nos.device,
                        base_prompt=self.base_prompt,
                        current_prompt=self.prompt,
                        command=line,
                    )
                    if isinstance(ret, dict):
                        if "new_prompt" in ret:
                            self.prompt = ret["new_prompt"].format(base_prompt=self.base_prompt)
                        ret = ret["output"]
                if "new_prompt" in cmd_data:
                    self.prompt = cmd_data["new_prompt"].format(base_prompt=self.base_prompt)
            else:

                log.warning(
                    "'%s' command prompt '%s' not matching current prompt '%s'",
                    line,
                    (
                        ", ".join(cmd_data.get("prompt", []))
                        if isinstance(cmd_data.get("prompt"), list)
                        else cmd_data.get("prompt", "")
                    ),
                    self.prompt,
                )
        except KeyError:
            log.error("shell.default '%s' command '%s' not found", self.base_prompt, [line])
            if callable(ret):
                ret = "An error occurred related to the command function"
        # pylint: disable=broad-except
        except ValueError:
            log.error("Output is still a callable")
            ret = "An error occurred"
        except (Exception,) as e:
            log.error("An error occurred: %s", str(e))
            ret = traceback.format_exc()
            ret = ret.replace("\n", self.newline)
        # check if need to exit
        if ret is True or not self.is_running.is_set():
            return True
        if ret is not None:
            try:
                ret = ret.format(base_prompt=self.base_prompt)
            except KeyError:
                log.error("Error in formatting output")
            self.writeline(ret)
        return False

default(line) #

Method called if no do_xyz methods found

Source code in fakenos/plugins/shell/cmd_shell.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def default(self, line):
    """Method called if no do_xyz methods found"""
    log.debug("shell.default '%s' running command '%s'", self.base_prompt, [line])
    ret = self.commands["_default_"]["output"]
    try:
        cmd_data = self.commands[line]
        if "alias" in cmd_data:
            cmd_data = {**self.commands[cmd_data.pop("alias")], **cmd_data}
        if self._check_prompt(cmd_data.get("prompt")):
            ret = cmd_data["output"]
            if callable(ret):
                ret = ret(
                    self.nos.device,
                    base_prompt=self.base_prompt,
                    current_prompt=self.prompt,
                    command=line,
                )
                if isinstance(ret, dict):
                    if "new_prompt" in ret:
                        self.prompt = ret["new_prompt"].format(base_prompt=self.base_prompt)
                    ret = ret["output"]
            if "new_prompt" in cmd_data:
                self.prompt = cmd_data["new_prompt"].format(base_prompt=self.base_prompt)
        else:

            log.warning(
                "'%s' command prompt '%s' not matching current prompt '%s'",
                line,
                (
                    ", ".join(cmd_data.get("prompt", []))
                    if isinstance(cmd_data.get("prompt"), list)
                    else cmd_data.get("prompt", "")
                ),
                self.prompt,
            )
    except KeyError:
        log.error("shell.default '%s' command '%s' not found", self.base_prompt, [line])
        if callable(ret):
            ret = "An error occurred related to the command function"
    # pylint: disable=broad-except
    except ValueError:
        log.error("Output is still a callable")
        ret = "An error occurred"
    except (Exception,) as e:
        log.error("An error occurred: %s", str(e))
        ret = traceback.format_exc()
        ret = ret.replace("\n", self.newline)
    # check if need to exit
    if ret is True or not self.is_running.is_set():
        return True
    if ret is not None:
        try:
            ret = ret.format(base_prompt=self.base_prompt)
        except KeyError:
            log.error("Error in formatting output")
        self.writeline(ret)
    return False

do_help(arg) #

Method to return help for commands

Source code in fakenos/plugins/shell/cmd_shell.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def do_help(self, arg):
    """Method to return help for commands"""
    lines = {}  # dict of {cmd: cmd_help}
    width = 0  # record longest command width for padding
    # form help for all commands
    for cmd, cmd_data in self.commands.items():
        # skip special commands
        if cmd.startswith("_") and cmd.endswith("_"):
            continue
        # skip commands that does not match current prompt
        if not self._check_prompt(cmd_data.get("prompt")):
            continue
        lines[cmd] = cmd_data.get("help", "")
        width = max(width, len(cmd))
    # form help lines
    help_msg = []
    for k, v in lines.items():
        padding = " " * (width - len(k)) + "  "
        help_msg.append(f"{k}{padding}{v}")
    self.writeline(self.newline.join(help_msg))

emptyline() #

This method to do nothing if empty line entered

Source code in fakenos/plugins/shell/cmd_shell.py
84
85
def emptyline(self):
    """This method to do nothing if empty line entered"""

postcmd(stop, line) #

Method to return stop value to stop the shell

Source code in fakenos/plugins/shell/cmd_shell.py
103
104
105
def postcmd(self, stop, line):
    """Method to return stop value to stop the shell"""
    return stop

precmd(line) #

Method to return line before processing the command

Source code in fakenos/plugins/shell/cmd_shell.py
 93
 94
 95
 96
 97
 98
 99
100
def precmd(self, line):
    """Method to return line before processing the command"""
    if os.environ.get("FAKENOS_RELOAD_COMMANDS"):
        changed_files = get_files_changed(nos.__path__[0])
        if changed_files:
            log.debug("Reloading... Files changed: %s", changed_files)
            self.reload_commands(changed_files)
    return line

reload_commands(changed_files) #

Method to reload commands

Source code in fakenos/plugins/shell/cmd_shell.py
87
88
89
90
91
def reload_commands(self, changed_files: list):
    """Method to reload commands"""
    for file in changed_files:
        self.nos.from_file(file)
        self.commands.update(self.nos.commands)

start() #

Method to start the shell

Source code in fakenos/plugins/shell/cmd_shell.py
71
72
73
def start(self):
    """Method to start the shell"""
    self.cmdloop()

stop() #

Method to stop the shell

Source code in fakenos/plugins/shell/cmd_shell.py
75
76
77
def stop(self):
    """Method to stop the shell"""
    self.stdin.write("exit" + self.newline)

writeline(value) #

Method to write a line to stdout with newline at the end

Source code in fakenos/plugins/shell/cmd_shell.py
79
80
81
82
def writeline(self, value):
    """Method to write a line to stdout with newline at the end"""
    for line in str(value).splitlines():
        self.stdout.write(line + self.newline)

Tape Plugins#

Idea - Tape Plugins will allow to record interactions with real devices and build NOS plugins automatically using gathered data.