Python标准库阅读系列—OS库(三)

os库阅读系列的最后一部分。咕咕咕~
def get_exec_path(env=None):
    import warnings

    if env is None:
        env = environ

    with warnings.catch_warnings():
        warnings.simplefilter("ignore", BytesWarning)

        try:
            path_list = env.get('PATH')
        except TypeError:
            path_list = None

        if supports_bytes_environ:
            try:
                path_listb = env[b'PATH']
            except (KeyError, TypeError):
                pass
            else:
                if path_list is not None:
                    raise ValueError(
                        "env cannot contain 'PATH' and b'PATH' keys")
                path_list = path_listb

            if path_list is not None and isinstance(path_list, bytes):
                path_list = fsdecode(path_list)

    if path_list is None:
        path_list = defpath
    return path_list.split(pathsep)

get_exec_path(env=None) :返回将用于搜索可执行文件的目录列表,与在外壳程序中启动一个进程时相似。指定的 env 应为用于搜索 PATH 的环境变量字典。默认情况下,当 env 为 None 时,将会使用 environ 。

首先导入warnings库,它控制警告消息。再之后判断env是否为空,若为空就将os.environ赋给env。

os.environ:一个字符串所对应环境的映像对象。举个例子来说,environ[‘HOME’]就代表了当前这个用户的主目录。

在Windows系统下
在Linux系统下

再之后warnings.catch_warnings():暂时抑制警告。

warnings.simplefilter():在警告过滤器规格列表中插入一个简单的条目。函数参数的含义与相同 。插入警告过滤规则。filterwarnings(),但是不需要正则表达式,因为插入的过滤器始终匹配任何模块中的任何消息,只要类别和行号匹配即可。

再之后获取当前程序目录。

os.supports_bytes_environ():如果操作系统上原生环境类型是字节型则为 True (例如在 Windows 上为 False)。

在Windows系统中
在Linux系统中

判断操作系统的原生环境是否为Linux。若是Windows操作系统,获取程序运行目录。放入path_list,再将path_list以文件系统编码方式解码。

再判断path_list是否为空,若为空。则给予默认搜索路径(defpath)。最后已经os.pathsep为分隔符分割path_list返回。

from _collections_abc import MutableMapping, Mapping

class _Environ(MutableMapping):
    def __init__(self, data, encodekey, decodekey, encodevalue, decodevalue):
        self.encodekey = encodekey
        self.decodekey = decodekey
        self.encodevalue = encodevalue
        self.decodevalue = decodevalue
        self._data = data

    def __getitem__(self, key):
        try:
            value = self._data[self.encodekey(key)]
        except KeyError:
            # raise KeyError with the original key value
            raise KeyError(key) from None
        return self.decodevalue(value)

    def __setitem__(self, key, value):
        key = self.encodekey(key)
        value = self.encodevalue(value)
        putenv(key, value)
        self._data[key] = value

    def __delitem__(self, key):
        encodedkey = self.encodekey(key)
        unsetenv(encodedkey)
        try:
            del self._data[encodedkey]
        except KeyError:
            # raise KeyError with the original key value
            raise KeyError(key) from None

    def __iter__(self):
        # list() from dict object is an atomic operation
        keys = list(self._data)
        for key in keys:
            yield self.decodekey(key)

    def __len__(self):
        return len(self._data)

    def __repr__(self):
        return 'environ({{{}}})'.format(', '.join(
            ('{!r}: {!r}'.format(self.decodekey(key), self.decodevalue(value))
            for key, value in self._data.items())))

    def copy(self):
        return dict(self)

    def setdefault(self, key, value):
        if key not in self:
            self[key] = value
        return self[key]

    def __ior__(self, other):
        self.update(other)
        return self

    def __or__(self, other):
        if not isinstance(other, Mapping):
            return NotImplemented
        new = dict(self)
        new.update(other)
        return new

    def __ror__(self, other):
        if not isinstance(other, Mapping):
            return NotImplemented
        new = dict(other)
        new.update(self)
        return new

接下来自定义了一个类_Environ继承自抽象基类MutableMapping,同时定义了一系列的如获取(__getitem__)、添加(__setitme__)等方法。将在后面用到。

def _createenviron():
    if name == 'nt':
        # Where Env Var Names Must Be UPPERCASE
        def check_str(value):
            if not isinstance(value, str):
                raise TypeError("str expected, not %s" % type(value).__name__)
            return value
        encode = check_str
        decode = str
        def encodekey(key):
            return encode(key).upper()
        data = {}
        for key, value in environ.items():
            data[encodekey(key)] = value
    else:
        # Where Env Var Names Can Be Mixed Case
        encoding = sys.getfilesystemencoding()
        def encode(value):
            if not isinstance(value, str):
                raise TypeError("str expected, not %s" % type(value).__name__)
            return value.encode(encoding, 'surrogateescape')
        def decode(value):
            return value.decode(encoding, 'surrogateescape')
        encodekey = encode
        data = environ
    return _Environ(data,
        encodekey, decode,
        encode, decode)

# unicode environ
environ = _createenviron()
del _createenviron

利用_createenviron()函数获取系统环境变量,这里利用到了上面定义的类_Environ。最后将_createeviron重命名为environ,删除_createeviron。(想不明白为啥要这样做,这不显得多此一举吗?)

Windows下
def getenv(key, default=None):
    return environ.get(key, default)

supports_bytes_environ = (name != 'nt')
__all__.extend(("getenv", "supports_bytes_environ"))

if supports_bytes_environ:
    def _check_bytes(value):
        if not isinstance(value, bytes):
            raise TypeError("bytes expected, not %s" % type(value).__name__)
        return value

    # bytes environ
    environb = _Environ(environ._data,
        _check_bytes, bytes,
        _check_bytes, bytes)
    del _check_bytes

    def getenvb(key, default=None):
        return environb.get(key, default)

    __all__.extend(("environb", "getenvb"))

def _fscodec():
    encoding = sys.getfilesystemencoding()
    errors = sys.getfilesystemencodeerrors()

    def fsencode(filename):
        filename = fspath(filename)  # Does type-checking of `filename`.
        if isinstance(filename, str):
            return filename.encode(encoding, errors)
        else:
            return filename

    def fsdecode(filename):
        filename = fspath(filename)  # Does type-checking of `filename`.
        if isinstance(filename, bytes):
            return filename.decode(encoding, errors)
        else:
            return filename

    return fsencode, fsdecode

fsencode, fsdecode = _fscodec()
del _fscodec

首先定义了一个获取指定环境变量值的函数 getenv(),它其实是_Environ类方法get的包装。

Windows中

然后做了一个简单的判断,判断当前系统是否为Linux,将结果存储在supports_bytes_environ。若当前操作系统是Linux则提供两个个获取字节串对象的方法 environb、getenvb()。

在Linux中
在Windows中
在Linux中获取字节串对象

又定义了一个函数_fscodec()将类似路径形式的文件名通过fsencode, fsdecode两个函数返回为bytes或者str类型的。该函数先将类似路径的形式文件名变成路径的文件系统表示,在返回符合需求的对象。

在Windows中
if _exists("fork") and not _exists("spawnv") and _exists("execv"):

    P_WAIT = 0
    P_NOWAIT = P_NOWAITO = 1

    __all__.extend(["P_WAIT", "P_NOWAIT", "P_NOWAITO"])

    def _spawnvef(mode, file, args, env, func):
        if not isinstance(args, (tuple, list)):
            raise TypeError('argv must be a tuple or a list')
        if not args or not args[0]:
            raise ValueError('argv first element cannot be empty')
        pid = fork()
        if not pid:
            # Child
            try:
                if env is None:
                    func(file, args)
                else:
                    func(file, args, env)
            except:
                _exit(127)
        else:
            # Parent
            if mode == P_NOWAIT:
                return pid # Caller is responsible for waiting!
            while 1:
                wpid, sts = waitpid(pid, 0)
                if WIFSTOPPED(sts):
                    continue

                return waitstatus_to_exitcode(sts)

    def spawnv(mode, file, args):
        return _spawnvef(mode, file, args, None, execv)

    def spawnve(mode, file, args, env):
        return _spawnvef(mode, file, args, env, execve)

    def spawnvp(mode, file, args):
        return _spawnvef(mode, file, args, None, execvp)

    def spawnvpe(mode, file, args, env):
        return _spawnvef(mode, file, args, env, execvpe)


    __all__.extend(["spawnv", "spawnve", "spawnvp", "spawnvpe"])


if _exists("spawnv"):

    def spawnl(mode, file, *args):
        return spawnv(mode, file, args)

    def spawnle(mode, file, *args):
        env = args[-1]
        return spawnve(mode, file, args[:-1], env)


    __all__.extend(["spawnl", "spawnle"])


if _exists("spawnvp"):
    def spawnlp(mode, file, *args):
        return spawnvp(mode, file, args)

    def spawnlpe(mode, file, *args):
        env = args[-1]
        return spawnvpe(mode, file, args[:-1], env)


    __all__.extend(["spawnlp", "spawnlpe"])

首先通过检查函数fork是否存在来判断当前原生操作系统是否为Linux,若是则令 P_WAIT=0 。接下来利用 _spawnvef() 实现spawn*族函数的核心部分,实现执行任何可执行的文件。sopawn*族函数通过给 _spawnvef() 隐式传入不同的参数,最大程度的实现了代码复用节约资源,在这之中用到了exec*族函数传入不同形式的可执行文件。(想不明白,为什么明明Windows中没有fork函数,那么在Windows中应该无法调用spawn*族函数,但是事实与其不符,Windows中能调用所有spawn*族函数)

在Windows中
# Supply os.popen()
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdout=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
    else:
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdin=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
class _wrap_close:
    def __init__(self, stream, proc):
        self._stream = stream
        self._proc = proc
    def close(self):
        self._stream.close()
        returncode = self._proc.wait()
        if returncode == 0:
            return None
        if name == 'nt':
            return returncode
        else:
            return returncode << 8  # Shift left to match old behavior
    def __enter__(self):
        return self
    def __exit__(self, *args):
        self.close()
    def __getattr__(self, name):
        return getattr(self._stream, name)
    def __iter__(self):
        return iter(self._stream)

# Supply os.fdopen()
def fdopen(fd, *args, **kwargs):
    if not isinstance(fd, int):
        raise TypeError("invalid fd type (%s, expected integer)" % type(fd))
    import io
    return io.open(fd, *args, **kwargs)

def _fspath(path):
    if isinstance(path, (str, bytes)):
        return path

    path_type = type(path)
    try:
        path_repr = path_type.__fspath__(path)
    except AttributeError:
        if hasattr(path_type, '__fspath__'):
            raise
        else:
            raise TypeError("expected str, bytes or os.PathLike object, "
                            "not " + path_type.__name__)
    if isinstance(path_repr, (str, bytes)):
        return path_repr
    else:
        raise TypeError("expected {}.__fspath__() to return str or bytes, "
                        "not {}".format(path_type.__name__,
                                        type(path_repr).__name__))

if not _exists('fspath'):
    fspath = _fspath
    fspath.__name__ = "fspath"

class PathLike(abc.ABC):

    """Abstract base class for implementing the file system path protocol."""

    @abc.abstractmethod
    def __fspath__(self):
        """Return the file system path representation of the object."""
        raise NotImplementedError

    @classmethod
    def __subclasshook__(cls, subclass):
        if cls is PathLike:
            return _check_methods(subclass, '__fspath__')
        return NotImplemented

    __class_getitem__ = classmethod(GenericAlias)

popen()函数传入命令和模式权限(读、写)以及缓冲区大小从一个命令中打开一个管道。该函数首先进行传入参数有效性的判断,要求缓冲区必须存在。再然后根据模式权限的不同传入不同的参数给 subprocess.Popen() 函数用于创建不同的进程,再包装返回一个自定义的 _wrap_close 类对象。os.popen() 函数其实就是对 subprocess.Popen() 函数的封装(牺牲速度换取便利性)。

fdopen() 函数就是对io.open() 函数的封装,没什么好讲的。(这要看不懂就放弃吧)

_fspath() 函数将类似路径的对象转化为str或者bytes类型对象。该函数首先判断路径对象是否为str或者bytes 对象,若不是则进行一些列的转换,转化后再进行一次判断,若是则返回,不是就报错。

接下来就是给_fspath改个名,改成fspath(这个操作蛮多的,不过这里没看懂什么意思)。

最后定义了PathLink接口。

if name == 'nt':
    class _AddedDllDirectory:
        def __init__(self, path, cookie, remove_dll_directory):
            self.path = path
            self._cookie = cookie
            self._remove_dll_directory = remove_dll_directory
        def close(self):
            self._remove_dll_directory(self._cookie)
            self.path = None
        def __enter__(self):
            return self
        def __exit__(self, *args):
            self.close()
        def __repr__(self):
            if self.path:
                return "<AddedDllDirectory({!r})>".format(self.path)
            return "<AddedDllDirectory()>"

    def add_dll_directory(path):
        import nt
        cookie = nt._add_dll_directory(path)
        return _AddedDllDirectory(
            path,
            cookie,
            nt._remove_dll_directory
        )

判断当前系统是否为Windows,若是则定义类_AddedDllDirectory用于存储dll搜索路径。同时定义了一个函数add_dll_directory() 用于添加。

***************************************************************************************************

终于写完了,拖了。。。好久的东西(大概大半年?哎拖延症有点凶啊)。咕咕咕

接下来写点渗透吧,还是挖赏金有意思。

留下评论