forked from ethanchewy/PythonBuddy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunpack.py
More file actions
113 lines (95 loc) · 3.48 KB
/
runpack.py
File metadata and controls
113 lines (95 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
""" WARNING! this module is incomplete and may have rough edges. Use only
if necessary
"""
import py
from struct import unpack
from rpython.rlib.rstruct.formatiterator import FormatIterator
from rpython.rlib.rstruct.error import StructError
from rpython.rlib.objectmodel import specialize
class MasterReader(object):
def __init__(self, s):
self.input = s
self.inputpos = 0
def read(self, count):
end = self.inputpos + count
if end > len(self.input):
raise StructError("unpack str size too short for format")
s = self.input[self.inputpos : end]
self.inputpos = end
return s
def align(self, mask):
self.inputpos = (self.inputpos + mask) & ~mask
class AbstractReader(object):
pass
def reader_for_pos(pos):
class ReaderForPos(AbstractReader):
def __init__(self, mr, bigendian):
self.mr = mr
self.bigendian = bigendian
def read(self, count):
return self.mr.read(count)
def appendobj(self, value):
self.value = value
def get_buffer_as_string_maybe(self):
return self.mr.input, self.mr.inputpos
def skip(self, size):
self.read(size) # XXX, could avoid taking the slice
ReaderForPos.__name__ = 'ReaderForPos%d' % pos
return ReaderForPos
class FrozenUnpackIterator(FormatIterator):
def __init__(self, fmt):
self.formats = []
self.fmt = fmt
def operate(self, fmtdesc, repetitions):
if fmtdesc.needcount:
self.formats.append((fmtdesc, repetitions, None))
else:
for i in range(repetitions):
self.formats.append((fmtdesc, 1, None))
def align(self, mask):
if self.formats:
fmt, rep, _ = self.formats.pop()
self.formats.append((fmt, rep, mask))
def _create_unpacking_func(self):
rg = range(len(self.formats))
perform_lst = []
miniglobals = {}
miniglobals.update(globals())
miniglobals['bigendian'] = self.bigendian
for i in rg:
fmtdesc, rep, mask = self.formats[i]
miniglobals['unpacker%d' % i] = fmtdesc.unpack
if mask is not None:
perform_lst.append('master_reader.align(%d)' % mask)
if not fmtdesc.needcount:
perform_lst.append('unpacker%d(reader%d)' % (i, i))
else:
perform_lst.append('unpacker%d(reader%d, %d)' % (i, i, rep))
miniglobals['reader_cls%d' % i] = reader_for_pos(i)
readers = ";".join(["reader%d = reader_cls%d(master_reader, bigendian)"
% (i, i) for i in rg])
perform = ";".join(perform_lst)
unpackers = ','.join(['reader%d.value' % i for i in rg])
source = py.code.Source("""
def unpack(s):
master_reader = MasterReader(s)
%(readers)s
%(perform)s
return (%(unpackers)s)
""" % locals())
exec source.compile() in miniglobals
self.unpack = miniglobals['unpack'] # override not-rpython version
def _freeze_(self):
assert self.formats
self._create_unpacking_func()
return True
@specialize.memo()
def create_unpacker(unpack_str):
fmtiter = FrozenUnpackIterator(unpack_str)
fmtiter.interpret(unpack_str)
assert fmtiter._freeze_()
return fmtiter
@specialize.arg(0)
def runpack(fmt, input):
unpacker = create_unpacker(fmt)
return unpacker.unpack(input)