做一个字节码追踪器,从内部理解 Python 的执行过程

本文由 伯乐在线jasper 翻译,黄利民 校稿。未经许可,禁止转载!
英文出处:blog.hakril.net。欢迎加入翻译组

最近我在研究 Python 的执行模型。我对 Python 内部的东西挺好奇,比如:类似 YIELDVALUE 和 YIELDFROM 此类操作码的实现;列表表达式、生成器表达式以及一些有趣的Python 特性是怎么编译的;异常触发之时,字节码层面发生了什么。

阅读 CPython 代码是相当有益的,但是我觉得要完全理解字节码的执行和堆栈的变化,光读源码是远远不够的。GDB 是个好选择,但我很懒,只想写一些高级的接口和 Python 代码。

因此我想做一个字节码级别的追踪 API,就像 sys.settrace 所提供的那样,但颗粒度更出色。这种练习完美地锻炼了我将 C 转化为 Python 的能力。我们所需的有以下几点:

  • 一个新的CPython解释器操作码
  • 一种将操作码注入Python字节码的方法
  • 一些Python代码,用于在Python的角度处理操作码

注:在这篇文章中,Python版本是3.5

一种新的CPython操作码

我们的新操作码:DEBUG_OP

这个新的操作码DEBUG_OP是我第一次尝试用C代码来实现CPython。我会尽量使之保持简洁。

我想要达到的目的是,无论我的操作码何时执行,都有一种方式调用一些Python代码,与此同时,我们也想能够追踪一些与执行上下文有关的数据。我们的操作码会把这些信息当作参数传递给我们的回调函数。我能辨识出的有用信息如下:

  • 堆栈的内容
  • 执行DEBUG_OP的帧对象信息

因此我们 DEBUG_OP 所需做的所有事情是:

  • 找到回调函数
  • 创建堆栈内容的列表
  • 调用回调函数,并将堆栈列表和当前帧作为参数传给它

听起来挺简单啊,让我们开始吧!

声明:以下的解释和代码都是经过大量段错误得到的。首先要做的事情,就是给我们的操作码命名并赋值,因此我们需要在Include/opcode.h中添加

Python

1
2
3
4
5
6
7
8
9
10
11
12
/** My own comments begin by ‘**’ **/
/** From: Includes/opcode.h **/
 
/* Instruction opcodes for compiled code */
 
/** We just have to define our opcode with a free value
    0 was the first one I found **/
#define DEBUG_OP                0
 
#define POP_TOP                 1
#define ROT_TWO                 2
#define ROT_THREE               3


这简单的部分是完成了,现在我们必须真正去编写我们的操作码。

实现 DEBUG_OP

在考虑实现DEBUG_OP之前,我们需要问我们自己的第一个问题是:“我的接口应该是什么样的?”

拥有一个可以调用其他代码的新操作码是很酷的,但是它实际上会调用哪些代码呢?这个操作码怎么找到回调函数呢?我选择了一种看起来最简单的解决方案,在帧的全局区域写死函数名。

现在问题就变成了:“我怎么从一个字典中找到一个不变的C字符串?”

为了回答这个问题,我们可以寻找一些用在Python的main循环中的用到的和上下文管理相关的标识符**enter**和**exit**。

我们可以看到标识符被用在 SETUP_WITH 操作码中。

Python

1
2
3
4
5
6
7
/** From: Python/ceval.c **/
TARGET(SETUP_WITH) {
_Py_IDENTIFIER(__exit__);
_Py_IDENTIFIER(__enter__);
PyObject *mgr = TOP();
PyObject *exit = special_lookup(mgr, &PyId___exit__), *enter;
PyObject *res;

现在,看一下_Py_IDENTIFIER 的宏定义:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** From: Include/object.h **/
 
/********************* String Literals ****************************************/
/* This structure helps managing static strings. The basic usage goes like this:
   Instead of doing
 
       r = PyObject_CallMethod(o, “foo”, “args”, ...);
 
   do
 
       _Py_IDENTIFIER(foo);
       ...
       r = _PyObject_CallMethodId(o, &PyId_foo, “args”, ...);
 
   PyId_foo is a static variable, either on block level or file level. On first
   usage, the string “foo” is interned, and the structures are linked. On interpreter
   shutdown, all strings are released (through _PyUnicode_ClearStaticStrings).
 
   Alternatively, _Py_static_string allows to choose the variable name.
   _PyUnicode_FromId returns a borrowed reference to the interned string.
   _PyObject_{Get,Set,Has}AttrId are __getattr__ versions using _Py_Identifier*.
*/
typedef struct _Py_Identifier {
    struct _Py_Identifier *next;
    const char* string;
    PyObject *object;
} _Py_Identifier;
 
#define _Py_static_string_init(value) { 0, value, 0 }
#define _Py_static_string(varname, value)  static _Py_Identifier varname = _Py_static_string_init(value)
#define _Py_IDENTIFIER(varname) _Py_static_string(PyId_##varname, #varname)

很好,至少注释部分已经说明得很清楚了。通过一番查找,我们发现了可以用来从字典找固定字符串的函数 _PyDict_GetItemId,所以我们操作码的查找部分的代码就是这样的:

Python

1
2
3
4
5
6
7
8
9
10
/** Our callback function will be named op_target **/
PyObject *target = NULL;
_Py_IDENTIFIER(op_target);
target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
if (target == NULL && _PyErr_OCCURRED()) {
    if (!PyErr_ExceptionMatches(PyExc_KeyError))
        goto error;
    PyErr_Clear();
    DISPATCH();
}

为了方便理解,我来解释一下这段代码:

  • f 是当前的帧,f->f_globals 是它的全局区域
  • 如果我们没有找到 op_target,我们需要检查这个异常是不是 KeyError
  • goto error; 是一种在 main-loop 中抛出异常的方法
  • PyErr_Clear() 抑制了当前异常,DISPATCH() 触发了下一个操作码的执行下一步是收集我们想要的堆栈信息。

    Python

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /** This code create a list with all the values on the current stack **/
    PyObject *value = PyList_New(0);
    for (i = 1 ; i <= STACK_LEVEL(); i++) {
        tmp = PEEK(i);
        if (tmp == NULL) {
            tmp = Py_None;
        }
        PyList_Append(value, tmp);
    }



    最后一步是调用回调函数,我们需要使用 call_function,通过研究操作码 CALL_FUNCTION 来学习怎么使用 call_function。

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** From: Python/ceval.c **/
TARGET(CALL_FUNCTION) {
    PyObject **sp, *res;
    /** stack_pointer is a local of the main loop.
        Its the pointer to the stacktop of our frame **/
    sp = stack_pointer;
    res = call_function(&sp, oparg);
    /** call_function handles the args it consummed on the stack for us **/
    stack_pointer = sp;
    PUSH(res);
    /** Standard exception handling **/
    if (res == NULL)
        goto error;
    DISPATCH();
}

有了这些信息,我们就能够精心地完成 DEBUG_OP:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
TARGET(DEBUG_OP) {
    PyObject *value = NULL;
    PyObject *target = NULL;
    PyObject *res = NULL;
    PyObject **sp = NULL;
    PyObject *tmp;
    int i;
    _Py_IDENTIFIER(op_target);
 
    target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
    if (target == NULL && _PyErr_OCCURRED()) {
        if (!PyErr_ExceptionMatches(PyExc_KeyError))
            goto error;
        PyErr_Clear();
        DISPATCH();
    }
    value = PyList_New(0);
    Py_INCREF(target);
    for (i = 1 ; i <= STACK_LEVEL(); i++) {
        tmp = PEEK(i);
        if (tmp == NULL)
            tmp = Py_None;
        PyList_Append(value, tmp);
    }
 
    PUSH(target);
    PUSH(value);
    Py_INCREF(f);
    PUSH(f);
    sp = stack_pointer;
    res = call_function(&sp, 2);
    stack_pointer = sp;
    if (res == NULL)
        goto error;
    Py_DECREF(res);
    DISPATCH();
}

因为我在编写 CPython 实现 C 代码方面没有太多的经验,,所以我可能漏掉了一些(我期待你的反馈)

编译通过!完成了!

看起来一切顺利,但是当我们尝试去执行 DEBUG_OP 时却失败了。自 2008 年以来,Python 使用事先完成的 GOTO(你可以从这里读取更多信息),因此我们需要更新下 goto jump table,我们仅需要在 Python/opcode_targets.h 中做如下修改:

Python

1
2
3
4
5
6
7
/** From: Python/opcode_targets.h **/
/** Easy change since DEBUG_OP is the opcode number 1 **/
static void *opcode_targets[256] = {
    //&&_unknown_opcode,
    &&TARGET_DEBUG_OP,
    &&TARGET_POP_TOP,
    /** ... **/

搞定了,现在我们拥有一个全新的可以工作的操作码,唯一的问题是,我们的操作码永远不会被调用,因为不存在于编译好的字节码中。现在我们需要在一些函数的字节码中注入 DEBUG_OP。

将操作码 DEBUG_OP 注入到 Python 字节码中

下面是一些把新的操作码插入 Python 字节码中的方法。

  • 我们可以像 Quarkslab 那样用 peephole optimizer
  • 我们可以在生成字节码时做些改变
  • 我们可以仅仅修改一些运行时的函数的字节码(这其实就是我们将要做的)

为了编写出新的操作码,有了上面的C代码就足够了,让我们回到起点,理解奇怪而神奇的Python!

So, what we are going to do is:

因此,我们将要做下面这些事儿:

  • 得到我们想要追踪的code object
  • 重写字节码来注入DEBUG_OP
  • 将新的code object替换回去

关于 code object 的提示

如果你听说过 code object,在我第一篇文章里有一点介绍。在网上也有一些相关文档,可以直接用 Ctrl+F 查找“code objects”

在这篇文章中,还有一件需要注意的事情是,code objects不能改变:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Python 3.4.2 (default, Oct  8 2014, 10:45:20)
[GCC 4.9.1] on linux
Type “help”, “copyright”, “credits” or “license” for more information.
>>> x = lambda y : 2
>>> x.__code__
<code object <lambda> at 0x7f481fd88390, file , line 1>
>>> x.__code__.co_name
>>> x.__code__.co_name = ‘truc’
Traceback (most recent call last):
  File , line 1, in <module>
AttributeError: readonly attribute
>>> x.__code__.co_consts = (‘truc’,)
Traceback (most recent call last):
  File , line 1, in <module>
AttributeError: readonly attribute

但是不用担心,我们会找到方法绕过这个问题。

所用工具

为了修改这些字节码,我们将需要一些工具:

  • dist模块用来反编译和分析字节码
  • dis.Bytecode是Python3.4的新特性,对于反编译和分析字节码特别有用
  • 简单修改code object的工具dis.

dis.Bytecode反编译一个code object,可以给我们一些关于操作码,参数和上下文有用的信息。

Python

1
2
3
4
5
6
7
8
9
# Python3.4
>>> import dis
>>> f = lambda x: x + 3
>>> for i in dis.Bytecode(f.__code__): print (i)
...
Instruction(opname=‘LOAD_FAST’, opcode=124, arg=0, argval=‘x’, argrepr=‘x’, offset=0, starts_line=1, is_jump_target=False)
Instruction(opname=‘LOAD_CONST’, opcode=100, arg=1, argval=3, argrepr=‘3’, offset=3, starts_line=None, is_jump_target=False)
Instruction(opname=‘BINARY_ADD’, opcode=23, arg=None, argval=None, argrepr=, offset=6, starts_line=None, is_jump_target=False)
Instruction(opname=‘RETURN_VALUE’, opcode=83, arg=None, argval=None, argrepr=, offset=7, starts_line=None, is_jump_target=False)

为了能够修改code objects,我创建了一个class,用来复制code object,并允许根据我们的需要修改相应的值,然后生成新的code object。

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MutableCodeObject(object):
    args_name = (“co_argcount”, “co_kwonlyargcount”, “co_nlocals”, “co_stacksize”, “co_flags”, “co_code”,
                  “co_consts”, “co_names”, “co_varnames”, “co_filename”, “co_name”, “co_firstlineno”,
                   “co_lnotab”, “co_freevars”, “co_cellvars”)
 
    def __init__(self, initial_code):
        self.initial_code = initial_code
        for attr_name in self.args_name:
            attr = getattr(self.initial_code, attr_name)
            if isinstance(attr, tuple):
                attr = list(attr)
            setattr(self, attr_name, attr)
 
    def get_code(self):
        args = []
        for attr_name in self.args_name:
            attr = getattr(self, attr_name)
            if isinstance(attr, list):
                attr = tuple(attr)
            args.append(attr)
        return self.initial_code.__class__(*args)

很容易使用,并解决了上面说的 code object 不可变的问题

Python

1
2
3
4
5
6
7
8
9
10
>>> x = lambda y : 2
>>> m = MutableCodeObject(x.__code__)
>>> m
<new_code.MutableCodeObject object at 0x7f3f0ea546a0>
>>> m.co_consts
[None, 2]
>>> m.co_consts[1] = ‘3’
>>> m.co_name = ‘truc’
>>> m.get_code()
<code object truc at 0x7f3f0ea2bc90, file , line 1>

测试新的操作码

现在我们有了注入DEBUG_OP的基本工具,我们来验证实现是否可用。

将操作码加入到一个最简单的函数中:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from new_code import MutableCodeObject
 
def op_target(*args):
    print(“WOOT”)
    print(“op_target called with args <{0}>“.format(args))
 
def nop():
    pass
 
new_nop_code = MutableCodeObject(nop.__code__)
new_nop_code.co_code = b“\x00” + new_nop_code.co_code[0:3] + b“\x00” + new_nop_code.co_code[1:]
new_nop_code.co_stacksize += 3
 
nop.__code__ = new_nop_code.get_code()
 
import dis
dis.dis(nop)
nop()
 
# Don’t forget that ./python is our custom Python implementing DEBUG_OP
hakril@computer ~/python/CPython3.5 % ./python proof.py
  8           0 <0>
              1 LOAD_CONST               0 (None)
              4 <0>
              5 RETURN_VALUE
WOOT
op_target called with args <([], <frame object at 0x7fde9eaebdb0>)>
WOOT
op_target called with args <([None], <frame object at 0x7fde9eaebdb0>)>

好像成功了!有一行代码需要解释一下:new_nop_code.co_stacksize += 3:

  • Co_stacksize表示code object所需的堆栈大小
  • DEBUG_OP增加了3个值到堆栈中,因此我们需要增加预留空间

现在我们可以将我们的操作码注入到每一个Python函数中了!

重写字节码

就像我们在上一个例子中看到的,重写Python字节码听起来很简单!为了在每一操作码之间注入DEBUG _OP,所有我们必须获取每一个操作码的偏移量(把我们操作码注入到参数上是有问题的),然后将操作码注入到这些偏移量中。偏移量很容易获取,使用dis.Bytecode就行。

如下所示:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def add_debug_op_everywhere(code_obj):
    # We get every instruction offset in the code object
    offsets = [instr.offset for instr in dis.Bytecode(code_obj)]
    # And insert a DEBUG_OP at every offset
    return insert_op_debug_list(code_obj, offsets)
 
def insert_op_debug_list(code, offsets):
    # We insert the DEBUG_OP one by one
    for nb, off in enumerate(sorted(offsets)):
        # Need to ajust the offsets by the number of opcodes already inserted before
        # That’s why we sort our offsets!
        code = insert_op_debug(code, off + nb)
    return code
 
# Last problem: what does insert_op_debug looks like?

基于上面的例子,有人可能会认为我们的insert_op_debug会在指定的偏移量增加一个”x00″,这是个坑啊!在第一个 DEBUG_OP 注入的例子中,被注入的函数是没有任何分支的,为了使 insert_op_debug 有完美的功能,我们需要考虑到存在分支操作码的情况。

Python 的分支一共有两种:

  • 绝对分支:看起来是这样的 Instruction_Pointer = argument(instruction)
  • 相对分支:看起来是这样的 Instruction_Pointer += argument(instruction)
  • 相对分支总是向前的

我们希望这些分支在插入操作码之后仍然能够正常工作,为此我们需要修改一些指令参数。以下是我用的逻辑:

对于每一个在插入偏移量之前的相对分支而言:

  • 如果目标地址是严格大于我们的插入偏移量,将指令参数增加 1
  • 如果相等,则不需要增加 1 就能够在跳转操作和目标地址之间执行DEBUG_OP
  • 如果小于,插入DEBUG_OP并不会影响到跳转操作和目标地址之间的距离

对于 code object 中的每一个绝对分支而言

  • 如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加 1
  • 如果相等,那么不需要任何修改,理由和相对分支部分是一样的
  • 如果小于,插入DEBUG_OP并不会影响到跳转操作和目标地址之间的距离

下面是实现:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Helper
def bytecode_to_string(bytecode):
    if bytecode.arg is not None:
        return struct.pack(“<Bh”, bytecode.opcode, bytecode.arg)
    return struct.pack(“<B”, bytecode.opcode)
 
# Dummy class for bytecode_to_string
class DummyInstr:
    def __init__(self, opcode, arg):
        self.opcode = opcode
        self.arg = arg
 
def insert_op_debug(code, offset):
    opcode_jump_rel = [‘FOR_ITER’, ‘JUMP_FORWARD’, ‘SETUP_LOOP’, ‘SETUP_WITH’, ‘SETUP_EXCEPT’, ‘SETUP_FINALLY’]
    opcode_jump_abs = [‘POP_JUMP_IF_TRUE’, ‘POP_JUMP_IF_FALSE’, ‘JUMP_ABSOLUTE’]
    res_codestring = b“”
    inserted = False
    for instr in dis.Bytecode(code):
        if instr.offset == offset:
            res_codestring += b“x00”
            inserted = True
        if instr.opname in opcode_jump_rel and not inserted: #relative jump are always forward
            if offset &lt; instr.offset + 3 + instr.arg: # inserted beetwen jump and dest: add 1 to dest (3 for size)
                #If equal: jump on DEBUG_OP to get info before exec instr
                res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
                continue
        if instr.opname in opcode_jump_abs:
            if instr.arg &gt; offset:
                res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
                continue
        res_codestring += bytecode_to_string(instr)
    # replace_bytecode just replaces the original code co_code
    return replace_bytecode(code, res_codestring)

我们可以看到结果如下:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
>>> def lol(x):
...     for i in range(10):
...         if x == i:
...             break
 
>>> dis.dis(lol)
101           0 SETUP_LOOP              36 (to 39)
              3 LOAD_GLOBAL              0 (range)
              6 LOAD_CONST               1 (10)
              9 CALL_FUNCTION            1 (1 positional, 0 keyword pair)
             12 GET_ITER
        >>   13 FOR_ITER                22 (to 38)
             16 STORE_FAST               1 (i)
 
102          19 LOAD_FAST                0 (x)
             22 LOAD_FAST                1 (i)
             25 COMPARE_OP               2 (==)
             28 POP_JUMP_IF_FALSE       13
 
103          31 BREAK_LOOP
             32 JUMP_ABSOLUTE           13
             35 JUMP_ABSOLUTE           13
        >>   38 POP_BLOCK
        >>   39 LOAD_CONST               0 (None)
             42 RETURN_VALUE
>>> lol.__code__ = transform_code(lol.__code__, add_debug_op_everywhere, add_stacksize=3)
 
>>> dis.dis(lol)
101           0 <0>
              1 SETUP_LOOP              50 (to 54)
              4 <0>
              5 LOAD_GLOBAL              0 (range)
              8 <0>
              9 LOAD_CONST               1 (10)
             12 <0>
             13 CALL_FUNCTION            1 (1 positional, 0 keyword pair)
             16 <0>
             17 GET_ITER
        >>   18 <0>
 
102          19 FOR_ITER                30 (to 52)
             22 <0>
             23 STORE_FAST               1 (i)
             26 <0>
             27 LOAD_FAST                0 (x)
             30 <0>
 
103          31 LOAD_FAST                1 (i)
             34 <0>
             35 COMPARE_OP               2 (==)
             38 <0>
             39 POP_JUMP_IF_FALSE       18
             42 <0>
             43 BREAK_LOOP
             44 <0>
             45 JUMP_ABSOLUTE           18
             48 <0>
             49 JUMP_ABSOLUTE           18
        >>   52 <0>
             53 POP_BLOCK
        >>   54 <0>
             55 LOAD_CONST               0 (None)
             58 <0>
             59 RETURN_VALUE
 
# Setup the simplest handler EVER
>>> def op_target(stack, frame):
...     print (stack)
 
# GO
>>> lol(2)
[]
[]
[<class ‘range’>]
[10, <class ‘range’>]
[range(0, 10)]
[<range_iterator object at 0x7f1349afab80>]
[0, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[0, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[1, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[1, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[2, 2, <range_iterator object at 0x7f1349afab80>]
[True, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[]
[None]

太棒啦!现在我们知道了如何获取堆栈信息和 Python 中每一个操作对应的帧信息。上面所展示的结果目前而言并不是很实用。在最后一部分中让我们对注入做进一步的封装。

增加 Python 封装

正如您所看到的,所有的底层接口工作正常。我们最后要做的一件事是让 op_target 更加有用(这部分相对而言比较空泛一些,毕竟在我看来这不是整个项目中最有趣的部分)。

首先我们来看一下帧的参数所能提供的信息,如果我们看到帧中存储的信息,我们将会看到下面这些:

  • f_code当前帧将执行的 code object
  • f_lasti当前的操作(code object 中的字节码字符串的索引)

经过我们的处理我们可以得知DEBUG_OP之后要被执行的操作码,这对我们聚合数据并展示是相当有用的。

我们可以新建一个用于追踪函数内部机制的class:

  • 改变函数自身的co_code
  • 设置回调函数作为op_debug的目标函数

一旦我们知道下一个操作,我们就可以分析它并修改它的参数。例如,我们可以增加一个auto-follow-called-functions的特性。

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def op_target(l, f, exc=None):
    if op_target.callback is not None:
        op_target.callback(l, f, exc)
 
class Trace:
    def __init__(self, func):
        self.func = func
 
    def call(self, *args, **kwargs):
        self.add_func_to_trace(self.func)
        # Activate Trace callback for the func call
        op_target.callback = self.callback
        try:
            res = self.func(*args, **kwargs)
        except Exception as e:
            res = e
        op_target.callback = None
        return res
 
    def add_func_to_trace(self, f):
        # Is it code? is it already transformed?
        if not hasattr(f ,“op_debug”) and hasattr(f, “__code__”):
            f.__code__ = transform_code(f.__code__, transform=add_everywhere, add_stacksize=ADD_STACK)
            f.__globals__[‘op_target’] = op_target
            f.op_debug = True
 
    def do_auto_follow(self, stack, frame):
        # Nothing fancy: FrameAnalyser is just the wrapper that gives the next executed instruction
        next_instr = FrameAnalyser(frame).next_instr()
        if “CALL” in next_instr.opname:
            arg = next_instr.arg
            f_index = (arg &amp; 0xff) + (2 * (arg &gt;&gt; 8))
            called_func = stack[f_index]
 
            # If call target is not traced yet: do it
            if not hasattr(called_func, “op_debug”):
                self.add_func_to_trace(called_func)

现在我们必须要做的是,用方法callback和do_report实现一个的子类,其中callback 方法将在每一个操作之后被调用。doreport 方法将我们收集到的信息打印出来。

这是一个伪函数追踪器实现:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class DummyTrace(Trace):
    def __init__(self, func):
        self.func = func
        self.data = collections.OrderedDict()
        self.last_frame = None
        self.known_frame = []
        self.report = []
 
    def callback(self, stack, frame, exc):
        if frame not in self.known_frame:
            self.known_frame.append(frame)
            self.report.append(” === Entering New Frame {0} ({1}) ===”.format(frame.f_code.co_name, id(frame)))
            self.last_frame = frame
        if frame != self.last_frame:
            self.report.append(” === Returning to Frame {0} {1}===”.format(frame.f_code.co_name, id(frame)))
            self.last_frame = frame
 
        self.report.append(str(stack))
        instr = FrameAnalyser(frame).next_instr()
        offset = str(instr.offset).rjust(8)
        opname = str(instr.opname).ljust(20)
        arg = str(instr.arg).ljust(10)
        self.report.append(“{0}  {1} {2} {3}”.format(offset, opname, arg, instr.argval))
        self.do_auto_follow(stack, frame)
 
    def do_report(self):
        print(“n”.join(self.report))

这里有一些实现的例子和使用方法。格式有些不方便观看,毕竟我并不擅长写这种对用户友好的报告。

例1:自动追踪堆栈信息和已经执行的指令

例2:上下文管理

最后是列表表达式的工作示例

例3:伪追踪器的输出

例4:输出收集的堆栈信息

总结

了解 Python 底层的好方法,解释器的 main 循环,Python 实现的 C 代码编程、Python 字节码,这个项目是一个好方法。同时,也让我们看到了 Python 的一些有趣的构造函数(诸如生成器、上下文管理器和列表表达式)的字节码行为。

这里有完整代码

另外,我们还可以修改所追踪的函数的堆栈。虽然不确定这个是否有用,但一定会充满乐趣。

1
收藏

1 评论





关于作者:jasper


运维开发一枚,关注devops、运维监控、python、开源、大数据、web开发、互联网;


个人主页 ·
我的文章

· 11 ·     

转载自演道,想查看更及时的互联网产品技术热点文章请点击http://go2live.cn