1from asyncutils import __version__ as V, config as C, exceptions as E
2from asyncutils._internal import patch as P, running_console as R
3from asyncutils._internal.helpers import fullname
4from asyncutils._internal.submodules import console_all as __all__
5import sys as S
6from asyncio import iscoroutine
7from asyncio.futures import _chain_future
8from itertools import repeat
9from os import getenv as g
10try: from _pyrepl.console import InteractiveColoredConsole as B # ty: ignore[unresolved-import]
11except ImportError: from code import InteractiveConsole as B; C.basic_repl = True
12_s = object()
13_f = '',
[docs]
14class ConsoleBase(B):
15 LOCALS_HANDLERS, interrupt_hooks, memerr_hooks, disallow_subclass_msg = __import__('collections').ChainMap(), (), (lambda self, f=S._clear_internal_caches if S.version_info >= (3, 13) else S._clear_type_cache, g=__import__('gc').collect, d=__import__('logging').getLogger('asyncutils').debug: f() or self.write('MemoryError\n') or d('Emergency garbage collection after MemoryError: %s objects collected in total', g()),), 'cannot subclass %s'; default_local_exit = _unsubclassable = False # noqa: B008
16 if C.basic_repl: CAN_USE_PYREPL = False # pragma: no cover
17 else: from _pyrepl.main import CAN_USE_PYREPL # ty: ignore[unresolved-import]
18 def __init__(self, loop, mod=None, modname=None, *, context_factory=__import__('_contextvars').copy_context, _f=_f, _s=_s, _m='cannot %s event loop within REPL', g=globals().get, _={'__cached__': 'cached', '__file__': 'origin', '__package__': 'parent', '__loader__': 'submodule_search_locations'}, _r=E.raise_exc): # noqa: B006
19 if (t := type(self)) is ConsoleBase: _r(TypeError, 'cannot instantiate asyncutils.console.ConsoleBase', notes='tip: subclass instead')
20 S.audit(fullname(t), loop)
21 if modname is None: modname = self.NAME
22 if mod is None: mod = __import__(modname, fromlist=_f if '.' in modname else ())
23 def stop(p=None, /, _=loop.stop, *, asap=False):
24 if p is _s: _() if asap else loop.call_soon_threadsafe(_)
25 else: raise RuntimeError(_m%'stop')
26 def close(p=None, /, _=loop.close):
27 if p is _s: _()
28 else: raise RuntimeError(_m%'close')
29 loop.stop, loop.close, self._internal_is_running, self.memory_errors, self._loop, self.context, self.exc, self._fut, (d := {})[modname] = stop, close, False, 0, loop, context_factory(), None, None, mod; super().__init__(d, '<stdin>', local_exit=self.default_local_exit); self.compile.compiler.flags |= 0x2000; d.update(__name__='__main__', __doc__='A console with top-level await support.', __spec__=__spec__, __annotations__={})
30 if (H := S.hexversion) > 0x30e00a0: d['__annotate__'] = g('__annotate__') # cover: off
31 if H < 0x30f00a1:
32 for k in _: d[k] = g(k)
33 elif H < 0x30f00f0:
34 for k, v in _.items(): d[k] = getattr(__spec__, v) # cover: on
35 if callable(h := self.LOCALS_HANDLERS.get(modname)): h(d)
36 elif h is not None: raise TypeError(f'asyncutils.console.ConsoleBase: locals handler for module {modname!r} should be callable, not {fullname(h)!r}')
[docs]
37 def refresh(self):
38 if not ((F := self._fut) is None or F.done()): F.cancel()
39 def __callback(self, fut, code, /, *, makef=type(refresh), corocheck=iscoroutine, futchain=_chain_future):
40 try: c = makef(code, self.locals)()
41 except SystemExit as e: return self.set_return_code(e)
42 except BaseException as e:
43 if isinstance(e, KeyboardInterrupt): self.interrupt()
44 elif isinstance(e, MemoryError): self.memoryerror()
45 return fut.set_exception(e)
46 if not corocheck(c): return fut.set_result(c)
47 try: futchain(_ := self._loop.create_task(c, context=self.context), fut); self._fut = _
48 except BaseException as e: fut.set_exception(e)
[docs]
49 def showtraceback(self):
50 if (t := S.exc_info())[2] is not None: self._showtraceback(*t, '')
[docs]
51 def runcode(self, code, *, futimpl=__import__('concurrent.futures._base', fromlist=_f).Future, dont_show_traceback=(KeyboardInterrupt, MemoryError, SyntaxError), threadsafe=True):
52 getattr(self._loop, 'call_soon_threadsafe' if threadsafe else 'call_soon')(self.__callback, F := futimpl(), code, context=self.context)
53 try: return F.result()
54 except SystemExit as e: self.set_return_code(e)
55 except BaseException as e:
56 if not isinstance(e, dont_show_traceback): self.showtraceback()
57 return getattr(self, 'STATEMENT_FAILED', None)
[docs]
58 def interact(self, banner=None, *, ps1='>>> ', _f=_f, _s=_s, _q=C.silent, _o=type('', (), {'write': lambda *_: None, 'flush': lambda _, /: None})(), p=g('PYTHONSTARTUP')): # noqa: B008
59 x = False; self.write_special(self.BANNER if banner is None else banner)
60 try:
61 if p and not S.flags.ignore_environment: # pragma: no cover
62 with __import__('tokenize').open(p) as f:
63 if _q: S.stdout, _o = _o, S.stdout
64 S.audit('cpython.run_startup', p); exec(compile(f.read(), p, 'exec'), self.locals) # noqa: S102
65 if _q: S.stdout = _o
66 if (p := getattr(S, 'ps1', None)) is None: p, x = ps1, True
67 if self.CAN_USE_PYREPL: self._interact_hook(f'{(t := __import__('_colorize').get_theme().syntax).prompt}{p}{(r := t.reset)}'.lstrip(), t.keyword, r, t.builtin); __import__('_pyrepl.simple_interact', fromlist=_f).run_multiline_interactive_console(self)
68 else: self._interact_hook(p, '', '', ''); super().interact('', '')
69 finally: self._loop.stop(_s); S.ps1 = ps1 if x else getattr(S, 'ps1', ps1) if p is None else p
[docs]
70 def _interact_hook(self, ps1, kcolor, reset, fcolor): n, S.ps1 = self.NAME, ps1; self.write_special(f'{ps1}{kcolor}import{reset} {n}\n{ps1}{kcolor}from{reset} {n} {kcolor}import{reset} *\n') # noqa: ARG002
[docs]
71 def prehook(self, max_memerrs): self._max_memerrs, self._internal_is_running = 3 if max_memerrs is None else max_memerrs, True
[docs]
72 def posthook(self): self._internal_is_running = False
[docs]
73 def write_special(self, msg): self.write(msg)
[docs]
74 def interrupt(self, _=_f, m='\nKeyboardInterrupt\n'):
75 if not self.CAN_USE_PYREPL: self.write(m)
76 elif callable(f := getattr(__import__('_pyrepl.readline', fromlist=_)._get_reader().threading_hook, 'add', None)): f('')
77 self.refresh()
[docs]
78 def memoryerror(self):
79 if (m := self.memory_errors) == self._max_memerrs: return self.set_return_code(f'ERROR: Exceeded MemoryError threshold: {m}\n')
80 self.memory_errors = m+1
81 for _ in self.memerr_hooks: _(self)
82 self.refresh()
[docs]
83 def set_return_code(self, e, /, _=_s): self.exc = e if isinstance(e, SystemExit) else SystemExit(*(e.args if isinstance(e, BaseException) else (e,))); self._loop.stop(_)
[docs]
84 def __init_subclass__(cls, *, name=None, native_handler=None, default_local_exit=True, disallow_subclass_msg=None, other_handlers=None, additional_interrupt_hooks=(), additional_memerr_hooks=(), template=f'%(name)s REPL (version %(version)s) running on {S.platform}\nType "help", "copyright", "credits" or "license" for more information, "clear" to clear the terminal, and "exit" or "quit" to exit.\n%(description)s\n', **k):
85 if cls._unsubclassable: raise TypeError(cls.disallow_subclass_msg%fullname(cls))
86 if name is None: name = cls.__qualname__.casefold().removesuffix('console')
87 if other_handlers is None: other_handlers = {}
88 k['name'] = cls.NAME = name; (f := k.setdefault)('version', 'unknown'); f('description', 'Enjoy!'); cls.BANNER, cls.LOCALS_HANDLERS, cls.interrupt_hooks, cls.memerr_hooks, cls.default_local_exit, cls._unsubclassable, other_handlers[name] = template%k, cls.LOCALS_HANDLERS.new_child(other_handlers), (*cls.interrupt_hooks, *additional_interrupt_hooks), (*cls.memerr_hooks, *additional_memerr_hooks), default_local_exit, disallow_subclass_msg is not None, native_handler
89 if disallow_subclass_msg: cls.disallow_subclass_msg = disallow_subclass_msg
90 def __repr__(self): return f'{fullname(self)}({self._loop!r}, local_exit={self.local_exit})'
91 @property
92 def is_running(self): return self._internal_is_running
[docs]
93 def run(self, *, exitmsg='Thank you for using %s!\nExiting REPL...\n', threadname='<%s REPL thread>', max_memerrs=None, always_run_interactive=bool(S.flags.inspect), always_install_completer=False, suppress_asyncio_warnings=False, suppress_unawaited_coroutine_warnings=False, _=frozenset(('win32', 'cygwin', 'android', 'ios', 'wasi'))):
94 self.prehook(max_memerrs); S.audit(f'{fullname(self)}.run', id(self)); l, w, n = self._loop, S.stderr.write, self.NAME
95 if always_run_interactive or S.stdin.isatty():
96 S.audit('cpython.run_stdin'); __import__('threading').Thread(name=threadname%n, target=self.interact, daemon=True).start()
97 if callable(h := getattr(S, i := '__interactivehook__', None)): # pragma: no cover
98 S.audit('cpython.run_interactivehook', h)
99 try: h()
100 except: w(f'Error running {self!r}!\nFailed calling sys.__interactivehook__\n'); __import__('traceback').print_exc() # noqa: E722
101 if always_install_completer or (S.platform not in _ and h.__module__ == 'site' and h.__name__ == h.__qualname__ == 'register_readline'):
102 try: __import__('readline').set_completer(__import__('rlcompleter').Completer(self.locals).complete)
103 except ImportError: w('Failed to install readline completer\n')
104 elif h is not None: w('Removing sys.__interactivehook__ since it is not callable\n'); delattr(S, i)
105 while True:
106 try: l.run_forever(); break
107 except KeyboardInterrupt: self.interrupt()
108 except MemoryError: self.memoryerror()
109 else: self.write_special(self.BANNER); self.runcode(compile((l := S.stdin).read(), getattr(l, 'name', '<stdin>'), 'exec'))
110 try: self.posthook()
111 except SystemExit: raise
112 except BaseException as e: w(f'{fullname(e)} occurred in posthook of {self!r}: {e}\n')
113 finally:
114 if suppress_asyncio_warnings: P.patch_aio_logs()
115 if suppress_unawaited_coroutine_warnings: P.patch_unawaited_coroutine_warnings()
116 self.write_special(exitmsg%n)
117 return self.retcode
118 @property
119 def retcode(self): return 0 if (e := self.exc) is None else e.code
120 P.patch_method_signatures((run, '*, exitmsg=None, threadname=None, max_memerrs=None, always_run_interactive=None, always_install_completer=False, suppress_asyncio_warnings=False, suppress_unawaited_coroutine_warnings=False'), (interrupt, ''), (set_return_code, 'e, /'), (__init__, 'loop, mod=None, modname=None, *, context_factory={}'), (__callback, 'fut, code, /, *, makef={0}, corocheck={0}, futchain={0}'), (interact, "banner=None, *, ps1='>>> '")); P.patch_classmethod_signatures((__init_subclass__, '*, name=None, native_handler=None, default_local_exit=True, disallow_subclass_msg=None, other_handlers=None, additional_interrupt_hooks=(), additional_memerr_hooks=(), template={}, version=None, description=None, **k'))
121def _(d, /):
122 def load_all(_=d):
123 for k, v in _.items(): _[k] = v if (g := getattr(v, 'load', None)) is None else g()
124 load_all.__qualname__, load_all.__module__, load_all.__text_signature__ = load_all.__name__, 'asyncutils', '()'; return load_all # ty: ignore[unresolved-attribute]
[docs]
125class AsyncUtilsConsole(ConsoleBase, version=V, description='asyncutils is a multi-purpose and efficient asynchronous utilties library.\nYou can use await statements directly instead of asyncio.run for quick testing.\nAll the submodules of asyncutils are also loaded into the namespace.\nDo not use functions such as util.sync_await in this REPL, since they are bound to cause deadlocks.', native_handler=lambda d, /, v=V, _=_f, r=_: (u := d.update)(m := __import__('asyncutils._internal.initialize', fromlist=_).s) or u(__version__=v, load_all=r(m)), default_local_exit=True, disallow_subclass_msg='cannot subclass %s; subclass asyncutils.console.ConsoleBase instead'):
126 def __repr__(self): return f'<{"running" if self.is_running else "idle"} asyncutils console at {id(self):#x}>'
127 @property
128 def is_running(self):
129 if not self._loop.is_running(): self._internal_is_running = False; return False
130 if self._internal_is_running == (b := R.getc() is self): return b
131 if b: self._internal_is_running = True
132 else: self.set_return_code(1)
133 S.stderr.write('User tampered with console-internal state!\n'); return False
[docs]
134 def _interact_hook(self, ps1, kcolor, reset, fcolor):
135 super()._interact_hook(ps1, kcolor, reset, fcolor)
136 if R.should_write_load_all(): self.write_special(f'{ps1}{fcolor}load_all{reset}()\n')
[docs]
137 def write_special(self, msg, _=C.silent):
138 if not _: self.write(msg)
[docs]
139 def prehook(self, max_memerrs, _=C.max_memerrs, _r='this console is already running', _a='another console is running'):
140 if self._internal_is_running: raise RuntimeError(_r)
141 if r := R.getc(): raise RuntimeError(_r if r is self else _a)
142 R.setc(self); super().prehook(_ if max_memerrs is None else max_memerrs) # ty: ignore[invalid-argument-type]
[docs]
143 def posthook(self, _m='WARNING: user tampered with asyncutils module state\n', _=C.pdb, _e=E.StateCorrupted('console-internal', "attribute 'exc' of console was set to a non-SystemExit exception")):
144 if R.unsetc() is not self: S.stderr.write(_m); del S.modules[__name__]
145 if _ and isinstance(e := self.exc, BaseException):
146 if not isinstance(e, SystemExit): raise _e
147 if (t := e.__traceback__) is None: raise e
148 __import__('pdb').post_mortem(t)
149 super().posthook()
[docs]
150 def showtraceback(self, s=3, a=('asyncutils\\console.py', 'asyncutils/console.py'), f=39, m=S.intern('__callback')):
151 t, v, b = S.exc_info()
152 if b is None: return
153 try:
154 for _ in repeat(None, s):
155 if (b := b.tb_next) is None: break
156 else:
157 if (c := b.tb_frame.f_code).co_filename.endswith(a) and c.co_firstlineno == f and c.co_name == m: b = b.tb_next
158 if b is not None: self._showtraceback(t, v, b, '')
159 finally: t = v = b = None
160 P.patch_method_signatures((showtraceback, ''), (posthook, ''), (prehook, 'max_memerrs'), (write_special, 'msg'))
161del _f, _s, g, C, V, B, _, iscoroutine, E