Tentative progress in stats

This commit is contained in:
Théophile Bastian 2018-07-17 11:36:56 +02:00
parent 3cb2c508a0
commit 216e442f5b
4 changed files with 359 additions and 127 deletions

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from stats_accu import StatsAccumulator
import gather_stats
import argparse
@ -15,6 +16,9 @@ class Config:
if args.feature == 'gather':
self.output = args.output
elif args.feature == 'sample':
self.size = int(args.size)
elif args.feature == 'analyze':
self.data_file = args.data_file
@ -34,6 +38,19 @@ class Config:
subparsers = parser.add_subparsers(help='Subcommands')
# Sample stats
parser_sample = subparsers.add_parser(
'sample',
help='Same as gather, but for a random subset of files')
parser_sample.set_defaults(feature='sample')
parser_sample.add_argument('--size', '-n',
default=1000,
help=('Pick this number of files'))
parser_sample.add_argument('--output', '-o',
default='elf_data',
help=('Output data to this file. Defaults '
'to "elf_data"'))
# Gather stats
parser_gather = subparsers.add_parser(
'gather',
@ -70,11 +87,17 @@ def main():
if config.feature == 'gather':
stats_accu = gather_stats.gather_system_files(config)
stats_accu.serialize(config.output)
stats_accu.dump(config.output)
elif config.feature == 'sample':
stats_accu = gather_stats.gather_system_files(
config,
sample_size=config.size)
elif config.feature == 'analyze':
# TODO
print("Not implemented", file=sys.stderr)
stats_accu = StatsAccumulator.load(config.data_file)
sys.exit(1)

View File

@ -1,53 +1,120 @@
from pyelftools_overlay import system_elfs
import pathos
from pyelftools_overlay import system_elfs, get_cfi
from elftools.dwarf import callframe
import multiprocessing
import signal
import itertools
import random
from stats_accu import StatsAccumulator
from stats_accu import \
StatsAccumulator, SingleFdeData, \
RegsList, FdeData, DwarfInstr
class FilesProcessor:
def __init__(self, cores, stats_accu=None):
class FilesProcessor(multiprocessing.Process):
def __init__(self, elf_list, shared_queue):
super().__init__()
self.stop_processing = False
self._processed_counter = itertools.count()
self.cores = cores
if stats_accu is None:
stats_accu = StatsAccumulator()
self.stats_accu = stats_accu
self.processed_counter = 0
self.elf_list = elf_list
self.shared_queue = shared_queue
def stop_processing_now(self):
self.stop_processing = True
def next_counter(self):
return self._processed_counter.__next__()
def run(self):
pos = 0
for descr in self.elf_list:
if self.stop_processing:
break
self.process_single_file(descr, pos)
pos += 1
def run(self, elf_list):
self.elf_count = len(elf_list)
with pathos.multiprocessing.ProcessPool(nodes=self.cores) as pool:
pool.map(self.process_single_file, elf_list)
print("=== Finished {} ===".format(self.name))
return 0
def process_single_file(self, elf_path):
def process_single_file(self, elf_descr, pos_in_list):
if self.stop_processing:
return
cur_file_count = self.next_counter()
print('> [{}/{} {:.0f}%] {}'.format(
cur_file_count, self.elf_count,
cur_file_count / self.elf_count * 100, elf_path))
self.stats_accu.process_file(elf_path)
elf_path, elf_type = elf_descr
self.processed_counter += 1
print('[{}, {}/{}] {}'.format(
self.shared_queue.qsize(),
pos_in_list + 1,
len(self.elf_list),
elf_path))
self.process_file(elf_path, elf_type)
def process_file(self, path, elftype):
''' Process a single file '''
cfi = get_cfi(path)
if not cfi:
return None
data = FdeData()
for entry in cfi:
if isinstance(entry, callframe.CIE): # Is a CIE
self.process_cie(entry, data)
elif isinstance(entry, callframe.FDE): # Is a FDE
self.process_fde(entry, data)
out = SingleFdeData(path, elftype, data)
self.shared_queue.put(out)
def incr_cell(self, table, key):
''' Increments table[key], or sets it to 1 if unset '''
if key in table:
table[key] += 1
else:
table[key] = 1
def process_cie(self, cie, data):
''' Process a CIE '''
pass # Nothing needed from a CIE
def process_fde(self, fde, data):
''' Process a FDE '''
data.fde_count += 1
decoded = fde.get_decoded()
row_count = len(decoded.table)
self.incr_cell(data.fde_with_lines, row_count)
for row in decoded.table:
self.process_reg(data.regs.cfa, row['cfa'])
for entry in row:
if isinstance(entry, int):
self.process_reg(data.regs.regs[entry], row[entry])
def process_reg(self, out_reg, reg_def):
''' Process a register '''
if isinstance(reg_def, callframe.CFARule):
if reg_def.reg is not None:
out_reg.regs[reg_def.reg] += 1
else:
pass # TODO exprs
else:
self.incr_cell(out_reg.instrs, DwarfInstr.of_pyelf(reg_def.type))
if reg_def.type == callframe.RegisterRule.REGISTER:
out_reg.regs[reg_def.arg] += 1
elif (reg_def.type == callframe.RegisterRule.EXPRESSION) \
or (reg_def.type == callframe.RegisterRule.VAL_EXPRESSION):
pass # TODO exprs
def gather_system_files(config):
def gather_system_files(config, sample_size=None):
stats_accu = StatsAccumulator()
processor = FilesProcessor(config.cores, stats_accu)
processors = []
def signal_graceful_exit(sig, frame):
''' Stop gracefully now '''
nonlocal processor
nonlocal processors
print("Stopping after this ELF…")
processor.stop_processing_now()
for processor in processors:
processor.stop_processing_now()
signal.signal(signal.SIGINT, signal_graceful_exit)
@ -55,6 +122,50 @@ def gather_system_files(config):
for elf_path in system_elfs():
elf_list.append(elf_path)
processor.run(elf_list)
if sample_size is not None:
elf_list_sampled = random.sample(elf_list, sample_size)
elf_list = elf_list_sampled
elf_count = len(elf_list)
elf_per_process = elf_count // config.cores
elf_list_slices = []
for i in range(config.cores - 1):
elf_list_slices.append(
elf_list[i * elf_per_process : (i+1) * elf_per_process])
elf_list_slices.append(
elf_list[(config.cores - 1) * elf_per_process
: config.cores * elf_per_process])
shared_queue = multiprocessing.Queue(elf_count)
for elf_range in elf_list_slices:
processors.append(FilesProcessor(elf_range, shared_queue))
if config.cores > 1:
for processor in processors:
processor.start()
while True:
for processor in processors:
if processor.is_alive():
print("== Waiting {} ({} {}) ==".format(
processor.name, processor.exitcode,
processor.is_alive()))
processor.join(timeout=1)
if processor.exitcode is None:
break # Loop around
print("== Joined {} ==".format(processor.name))
terminated = True
for processor in processors:
if processor.exitcode is None:
terminated = False
if terminated:
break
else:
processors[0].run() # run(), not start(): in the same thread
while not shared_queue.empty(): # Reliable because everything is joined
stats_accu.add_fde(shared_queue.get_nowait())
return stats_accu

View File

@ -2,6 +2,7 @@
from elftools.elf.elffile import ELFFile
from elftools.common.exceptions import ELFError, DWARFError
from stats_accu import ElfType
import os
@ -44,20 +45,20 @@ def system_elfs():
os.readlink(path)))
sysbin_dirs = [
'/lib',
'/usr/lib',
'/usr/local/lib',
'/bin',
'/usr/bin',
'/usr/local/bin',
'/sbin',
('/lib', ElfType.ELF_LIB),
('/usr/lib', ElfType.ELF_LIB),
('/usr/local/lib', ElfType.ELF_LIB),
('/bin', ElfType.ELF_BINARY),
('/usr/bin', ElfType.ELF_BINARY),
('/usr/local/bin', ElfType.ELF_BINARY),
('/sbin', ElfType.ELF_BINARY),
]
to_explore = sysbin_dirs
seen_elfs = set()
while to_explore:
bindir = to_explore.pop()
bindir, elftype = to_explore.pop()
if not os.path.isdir(bindir):
continue
@ -65,12 +66,23 @@ def system_elfs():
for direntry in os.scandir(bindir):
if not direntry.is_file():
if direntry.is_dir():
to_explore.append(direntry.path)
to_explore.append((direntry.path, elftype))
continue
canonical_name = readlink_rec(direntry.path)
if canonical_name in seen_elfs:
continue
valid_elf = True
try:
with open(canonical_name, 'rb') as handle:
magic_bytes = handle.read(4)
if magic_bytes != b'\x7fELF':
valid_elf = False
except Exception:
continue
if not valid_elf:
continue
seen_elfs.add(canonical_name)
yield canonical_name
yield (canonical_name, elftype)

View File

@ -1,9 +1,9 @@
from elftools.dwarf import callframe
from pyelftools_overlay import get_cfi
from enum import Enum
import json
import enum
import subprocess
import re
import json
import collections
from math import ceil
@ -69,109 +69,195 @@ def elf_so_deps(path):
"{}.").format(path, exn.returncode))
class ElfType(Enum):
ELF_LIB = auto()
ELF_BINARY = auto()
class ElfType(enum.Enum):
ELF_LIB = enum.auto()
ELF_BINARY = enum.auto()
class DwarfInstr(enum.Enum):
@staticmethod
def of_pyelf(val):
_table = {
callframe.RegisterRule.UNDEFINED: DwarfInstr.INSTR_UNDEF,
callframe.RegisterRule.SAME_VALUE: DwarfInstr.INSTR_SAME_VALUE,
callframe.RegisterRule.OFFSET: DwarfInstr.INSTR_OFFSET,
callframe.RegisterRule.VAL_OFFSET: DwarfInstr.INSTR_VAL_OFFSET,
callframe.RegisterRule.REGISTER: DwarfInstr.INSTR_REGISTER,
callframe.RegisterRule.EXPRESSION: DwarfInstr.INSTR_EXPRESSION,
callframe.RegisterRule.VAL_EXPRESSION:
DwarfInstr.INSTR_VAL_EXPRESSION,
callframe.RegisterRule.ARCHITECTURAL:
DwarfInstr.INSTR_ARCHITECTURAL,
}
return _table[val]
INSTR_UNDEF = enum.auto()
INSTR_SAME_VALUE = enum.auto()
INSTR_OFFSET = enum.auto()
INSTR_VAL_OFFSET = enum.auto()
INSTR_REGISTER = enum.auto()
INSTR_EXPRESSION = enum.auto()
INSTR_VAL_EXPRESSION = enum.auto()
INSTR_ARCHITECTURAL = enum.auto()
def intify_dict(d):
out = {}
for key in d:
try:
nKey = int(key)
except Exception:
nKey = key
try:
out[nKey] = int(d[key])
except ValueError:
out[nKey] = d[key]
return out
class RegData:
def __init__(self, instrs=None, regs=None, exprs=None):
if instrs is None:
instrs = {}
if regs is None:
regs = [0]*17
if exprs is None:
exprs = {}
self.instrs = intify_dict(instrs)
self.regs = regs
self.exprs = intify_dict(exprs)
@staticmethod
def map_dict_keys(fnc, dic):
out = {}
for key in dic:
out[fnc(key)] = dic[key]
return out
def dump(self):
return {
'instrs': RegData.map_dict_keys(lambda x: x.value, self.instrs),
'regs': self.regs,
'exprs': self.exprs,
}
@staticmethod
def load(data):
return RegData(
instrs=RegData.map_dict_keys(
lambda x: DwarfInstr(int(x)),
data['instrs']),
regs=data['regs'],
exprs=data['exprs'],
)
class RegsList:
def __init__(self, cfa=None, regs=None):
if cfa is None:
cfa = RegsList.fresh_reg()
if regs is None:
regs = [RegsList.fresh_reg() for _ in range(17)]
self.cfa = cfa
self.regs = regs
@staticmethod
def fresh_reg():
return RegData()
def dump(self):
return {
'cfa': RegData.dump(self.cfa),
'regs': [RegData.dump(r) for r in self.regs],
}
@staticmethod
def load(data):
return RegsList(
cfa=RegData.load(data['cfa']),
regs=[RegData.load(r) for r in data['regs']],
)
class FdeData:
def __init__(self, fde_count=0, fde_with_lines=None, regs=None):
if fde_with_lines is None:
fde_with_lines = {}
if regs is None:
regs = RegsList()
self.fde_count = fde_count
self.fde_with_lines = intify_dict(fde_with_lines)
self.regs = regs
def dump(self):
return {
'fde_count': self.fde_count,
'fde_with_lines': self.fde_with_lines,
'regs': self.regs.dump(),
}
@staticmethod
def load(data):
return FdeData(
fde_count=int(data['fde_count']),
fde_with_lines=data['fde_with_lines'],
regs=RegsList.load(data['regs']))
class SingleFdeData:
def __init__(self, path, elf_type, data):
self.path = path
self.elf_type = elf_type
self.data = data
self.data = data # < of type FdeData
self.gather_deps()
def gather_deps(self):
""" Collect ldd data on the binary """
self.deps = elf_so_deps(self.path)
# self.deps = elf_so_deps(self.path)
self.deps = []
def dump(self):
return {
'path': self.path,
'elf_type': self.elf_type.value,
'data': self.data.dump()
}
@staticmethod
def load(data):
return SingleFdeData(
data['path'],
ElfType(int(data['elf_type'])),
FdeData.load(data['data']))
class StatsAccumulator:
def __init__(self):
self.elf_count = 0
self.fde_count = 0
self.fde_row_count = 0
self.fde_with_n_rows = {}
self.fdes = []
def serialize(self, path):
''' Save the gathered data to `stream` '''
def add_fde(self, fde_data):
self.fdes.append(fde_data)
notable_fields = [
'elf_count',
'fde_count',
'fde_row_count',
'fde_with_n_rows',
]
out = {}
for field in notable_fields:
out[field] = self.__dict__[field]
def get_fdes(self):
return self.fdes
with open(path, 'wb') as stream:
json.dump(out, stream)
def add_stats_accu(self, stats_accu):
for fde in stats_accu.get_fdes():
self.add_fde(fde)
def dump(self, path):
dict_form = [fde.dump() for fde in self.fdes]
print(dict_form)
with open(path, 'w') as handle:
handle.write(json.dumps(dict_form))
@staticmethod
def unserialize(path):
def load(path):
with open(path, 'r') as handle:
text = handle.read()
out = StatsAccumulator()
with open(path, 'wb') as stream:
data = json.load(stream)
for field in data:
out.field = data[field]
out.fdes = [SingleFdeData.load(data) for data in json.loads(text)]
return out
def report(self):
''' Report on the statistics gathered '''
self.fde_rows_proportion = ProportionFinder(
self.fde_with_n_rows)
rows = [
("ELFs analyzed", self.elf_count),
("FDEs analyzed", self.fde_count),
("FDE rows analyzed", self.fde_row_count),
("Avg. rows per FDE", self.fde_row_count / self.fde_count),
("Median rows per FDE",
self.fde_rows_proportion.find_at_proportion(0.5)),
("Max rows per FDE", max(self.fde_with_n_rows.keys())),
]
title_size = max(map(lambda x: len(x[0]), rows))
line_format = "{:<" + str(title_size + 1) + "} {}"
for row in rows:
print(line_format.format(row[0], row[1]))
def process_file(self, path):
''' Process a single file '''
cfi = get_cfi(path)
if not cfi:
return
self.elf_count += 1
for entry in cfi:
if isinstance(entry, callframe.CIE): # Is a CIE
self.process_cie(entry)
elif isinstance(entry, callframe.FDE): # Is a FDE
self.process_fde(entry)
def incr_cell(self, table, key):
''' Increments table[key], or sets it to 1 if unset '''
if key in table:
table[key] += 1
else:
table[key] = 1
def process_cie(self, cie):
''' Process a CIE '''
pass # Nothing needed from a CIE
def process_fde(self, fde):
''' Process a FDE '''
self.fde_count += 1
decoded = fde.get_decoded()
row_count = len(decoded.table)
self.fde_row_count += row_count
self.incr_cell(self.fde_with_n_rows, row_count)