[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: streaming.py
File is not writable. Editing disabled.
# # This file is part of pyasn1 software. # # Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> # License: https://pyasn1.readthedocs.io/en/latest/license.html # import io import os from pyasn1 import error from pyasn1.type import univ class CachingStreamWrapper(io.IOBase): """Wrapper around non-seekable streams. Note that the implementation is tied to the decoder, not checking for dangerous arguments for the sake of performance. The read bytes are kept in an internal cache until setting _markedPosition which may reset the cache. """ def __init__(self, raw): self._raw = raw self._cache = io.BytesIO() self._markedPosition = 0 def peek(self, n): result = self.read(n) self._cache.seek(-len(result), os.SEEK_CUR) return result def seekable(self): return True def seek(self, n=-1, whence=os.SEEK_SET): # Note that this not safe for seeking forward. return self._cache.seek(n, whence) def read(self, n=-1): read_from_cache = self._cache.read(n) if n != -1: n -= len(read_from_cache) if not n: # 0 bytes left to read return read_from_cache read_from_raw = self._raw.read(n) self._cache.write(read_from_raw) return read_from_cache + read_from_raw @property def markedPosition(self): """Position where the currently processed element starts. This is used for back-tracking in SingleItemDecoder.__call__ and (indefLen)ValueDecoder and should not be used for other purposes. The client is not supposed to ever seek before this position. """ return self._markedPosition @markedPosition.setter def markedPosition(self, value): # By setting the value, we ensure we won't seek back before it. # `value` should be the same as the current position # We don't check for this for performance reasons. self._markedPosition = value # Whenever we set _marked_position, we know for sure # that we will not return back, and thus it is # safe to drop all cached data. if self._cache.tell() > io.DEFAULT_BUFFER_SIZE: self._cache = io.BytesIO(self._cache.read()) self._markedPosition = 0 def tell(self): return self._cache.tell() def asSeekableStream(substrate): """Convert object to seekable byte-stream. Parameters ---------- substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString` Returns ------- : :py:class:`io.IOBase` Raises ------ : :py:class:`~pyasn1.error.PyAsn1Error` If the supplied substrate cannot be converted to a seekable stream. """ if isinstance(substrate, io.BytesIO): return substrate elif isinstance(substrate, bytes): return io.BytesIO(substrate) elif isinstance(substrate, univ.OctetString): return io.BytesIO(substrate.asOctets()) try: if substrate.seekable(): # Will fail for most invalid types return substrate else: return CachingStreamWrapper(substrate) except AttributeError: raise error.UnsupportedSubstrateError( "Cannot convert " + substrate.__class__.__name__ + " to a seekable bit stream.") def isEndOfStream(substrate): """Check whether we have reached the end of a stream. Although it is more effective to read and catch exceptions, this function Parameters ---------- substrate: :py:class:`IOBase` Stream to check Returns ------- : :py:class:`bool` """ if isinstance(substrate, io.BytesIO): cp = substrate.tell() substrate.seek(0, os.SEEK_END) result = substrate.tell() == cp substrate.seek(cp, os.SEEK_SET) yield result else: received = substrate.read(1) if received is None: yield if received: substrate.seek(-1, os.SEEK_CUR) yield not received def peekIntoStream(substrate, size=-1): """Peek into stream. Parameters ---------- substrate: :py:class:`IOBase` Stream to read from. size: :py:class:`int` How many bytes to peek (-1 = all available) Returns ------- : :py:class:`bytes` or :py:class:`str` The return type depends on Python major version """ if hasattr(substrate, "peek"): received = substrate.peek(size) if received is None: yield while len(received) < size: yield yield received else: current_position = substrate.tell() try: for chunk in readFromStream(substrate, size): yield chunk finally: substrate.seek(current_position) def readFromStream(substrate, size=-1, context=None): """Read from the stream. Parameters ---------- substrate: :py:class:`IOBase` Stream to read from. Keyword parameters ------------------ size: :py:class:`int` How many bytes to read (-1 = all available) context: :py:class:`dict` Opaque caller context will be attached to exception objects created by this function. Yields ------ : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError` Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError` object if no `size` bytes is readily available in the stream. The data type depends on Python major version Raises ------ : :py:class:`~pyasn1.error.EndOfStreamError` Input stream is exhausted """ while True: # this will block unless stream is non-blocking received = substrate.read(size) if received is None: # non-blocking stream can do this yield error.SubstrateUnderrunError(context=context) elif not received and size != 0: # end-of-stream raise error.EndOfStreamError(context=context) elif len(received) < size: substrate.seek(-len(received), os.SEEK_CUR) # behave like a non-blocking stream yield error.SubstrateUnderrunError(context=context) else: break yield received
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: premium707.web-hosting.com
Server IP: 198.177.120.115
PHP Version: 8.1.34
Server Software: LiteSpeed
System: Linux premium707.web-hosting.com 4.18.0-553.45.1.lve.el8.x86_64 #1 SMP Wed Mar 26 12:08:09 UTC 2025 x86_64
HDD Total: 97.87 GB
HDD Free: 76.3 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Enabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes (py3)
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: urbaoubp
User ID (UID): 1252
Group ID (GID): 1257
Script Owner UID: 1252
Current Dir Owner: N/A