243 lines
8.1 KiB
Python
243 lines
8.1 KiB
Python
import logging
|
|
from hunk_changes import InlineLevenshtein, HunkLevenshtein
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LineMovement:
|
|
""" A movement to a given line, absolute or relative """
|
|
|
|
def __init__(self, absolute=None, relative=None):
|
|
self.absolute = absolute
|
|
self.relative = relative
|
|
if self.absolute and self.relative:
|
|
raise Exception("Cannot move both absolutely and relatively")
|
|
|
|
def __add__(self, num):
|
|
if not isinstance(num, type(0)):
|
|
raise Exception("Can only add an integer")
|
|
if self.absolute is not None:
|
|
return self.__class__(absolute=self.absolute + num)
|
|
if self.relative is not None:
|
|
return self.__class__(relative=self.relative + num)
|
|
|
|
def do(self, tmux_session):
|
|
if self.relative:
|
|
tmux_session.type_keys("escape", "{}j".format(self.relative))
|
|
elif self.absolute:
|
|
tmux_session.type_keys("escape", "{}G".format(self.absolute))
|
|
self.relative = 0
|
|
self.absolute = None
|
|
|
|
|
|
class VimSession:
|
|
""" A Vim session instrumented through tmux """
|
|
|
|
debug = False
|
|
|
|
def __init__(self, tmux_session, file_path=None):
|
|
self.tmux_session = tmux_session
|
|
self.file_path = file_path
|
|
|
|
self.tmux_session.type_keys("vim")
|
|
if self.file_path:
|
|
self.tmux_session.type_keys(" {}".format(self.file_path))
|
|
self.tmux_session.type_keys("enter")
|
|
|
|
self.mode = "command"
|
|
|
|
self.log_open = False
|
|
|
|
def log(self, msg):
|
|
if not self.debug:
|
|
return
|
|
|
|
self.set_mode("command")
|
|
if not self.log_open:
|
|
self.tmux_session.send_keys(":new", "enter", "C-w", "j")
|
|
self.log_open = True
|
|
|
|
self.tmux_session.send_keys("C-w", "k")
|
|
self.tmux_session.send_keys("o", msg.rstrip(), "escape")
|
|
self.tmux_session.send_keys("C-w", "j")
|
|
|
|
def set_mode(self, new_mode, dry_run=False, ofs_balance=True):
|
|
""" Sets Vim to mode `new_mode`. Returns the resulting cursor movement, in
|
|
column offset. If `dry_run`, do not actually change the mode, just compute the
|
|
offset. If `ofs_balance`, balance the induced offset with appropriate cursor
|
|
movement. """
|
|
|
|
if new_mode == self.mode:
|
|
return 0
|
|
|
|
if dry_run and ofs_balance:
|
|
return 0
|
|
|
|
ofs = 0
|
|
|
|
if self.mode != "command":
|
|
ofs -= 1
|
|
|
|
if not dry_run:
|
|
self.tmux_session.type_keys("escape")
|
|
self.mode = "command"
|
|
|
|
if ofs_balance and ofs: # implies `not dry_run`
|
|
if ofs < 0:
|
|
self.tmux_session.type_keys("{}l".format(-ofs))
|
|
else:
|
|
self.tmux_session.type_keys("{}h".format(ofs))
|
|
ofs = 0
|
|
|
|
if new_mode == "insert":
|
|
if not dry_run:
|
|
self.tmux_session.type_keys("i")
|
|
self.mode = "insert"
|
|
elif new_mode == "replace":
|
|
if not dry_run:
|
|
self.tmux_session.type_keys("R")
|
|
self.mode = "replace"
|
|
|
|
return ofs
|
|
|
|
def edit_file(self, file_path):
|
|
self.set_mode("command")
|
|
self.tmux_session.type_keys(":e ", file_path, "enter")
|
|
self.file_path = file_path
|
|
|
|
def apply_patchset(self, patchset):
|
|
for patch in patchset:
|
|
self.apply_patch(patch)
|
|
|
|
def apply_patch(self, patch):
|
|
source = patch.source.decode("utf8")
|
|
target = patch.target.decode("utf8")
|
|
if source != target:
|
|
self.set_mode("command")
|
|
self.tmux_session.type_keys(":!mv ", source, " ", target, "enter")
|
|
if self.file_path != target:
|
|
self.edit_file(target)
|
|
|
|
for hunk in patch:
|
|
self.apply_hunk(hunk)
|
|
|
|
self.set_mode("command")
|
|
self.tmux_session.type_keys(":w", "enter")
|
|
|
|
@staticmethod
|
|
def tabify(text):
|
|
""" Substitute groups of four spaces by a tabulation, as much as possible. """
|
|
return text.replace(" ", "\t")
|
|
|
|
def write_line(self, line):
|
|
""" Write a line to the vim buffer, assuming everything is set up for it and it
|
|
must be insterted above. """
|
|
|
|
line = line.rstrip()
|
|
|
|
if line.startswith(" "):
|
|
lead_spaces = 0
|
|
while lead_spaces < len(line) and line[lead_spaces] == " ":
|
|
lead_spaces += 1
|
|
line = line.strip()
|
|
|
|
self.set_mode("command")
|
|
self.tmux_session.type_keys(
|
|
"O", "escape", "{}a ".format(lead_spaces), "escape", "A", line, "escape"
|
|
)
|
|
else:
|
|
self.tmux_session.type_keys("O", line, "escape")
|
|
|
|
def subst_line(self, pre, post):
|
|
""" Substitute the current line of the vim buffer, assuming everything is set
|
|
up for it """
|
|
|
|
line_levenshtein = InlineLevenshtein(pre, post).compute()
|
|
ops = line_levenshtein["ops"]
|
|
|
|
edit_pos = 1
|
|
rel_pos = 0
|
|
|
|
for op, (_, span), values, _ in ops:
|
|
if op == "L":
|
|
edit_pos += span
|
|
rel_pos += span
|
|
else:
|
|
if rel_pos > 0:
|
|
self.set_mode("command")
|
|
self.tmux_session.type_keys("{}|".format(edit_pos))
|
|
rel_pos = 0
|
|
if op == "I":
|
|
self.set_mode("insert")
|
|
self.tmux_session.type_keys(self.tabify(values))
|
|
edit_pos += span
|
|
elif op == "D":
|
|
self.set_mode("command")
|
|
self.tmux_session.type_keys("x")
|
|
elif op == "S":
|
|
self.set_mode("replace")
|
|
self.tmux_session.type_keys(self.tabify(values[1]))
|
|
edit_pos += span
|
|
|
|
self.set_mode("command")
|
|
|
|
def apply_hunk(self, hunk):
|
|
logger.debug("Applying hunk @{}/{}".format(hunk.startsrc, hunk.starttgt))
|
|
pre_lines = []
|
|
post_lines = []
|
|
cur_subhunk_line = hunk.starttgt
|
|
cur_target_line = hunk.starttgt
|
|
for b_line in hunk.text:
|
|
u_line = b_line.decode("utf8")
|
|
op = u_line[0]
|
|
line = u_line[1:]
|
|
|
|
if op == "+":
|
|
post_lines.append(line)
|
|
cur_target_line += 1
|
|
elif op == "-":
|
|
pre_lines.append(line)
|
|
elif op == " ":
|
|
if pre_lines or post_lines:
|
|
logger.debug(
|
|
"\tApplying subhunk @{} span {}/{}".format(
|
|
cur_subhunk_line, len(pre_lines), len(post_lines)
|
|
)
|
|
)
|
|
self.apply_subhunk(pre_lines, post_lines, cur_subhunk_line)
|
|
cur_target_line += 1
|
|
cur_subhunk_line = cur_target_line
|
|
pre_lines = []
|
|
post_lines = []
|
|
|
|
if pre_lines or post_lines:
|
|
self.apply_subhunk(pre_lines, post_lines, cur_subhunk_line)
|
|
|
|
def apply_subhunk(self, pre_lines, post_lines, startline_target):
|
|
hunk_levenshtein = HunkLevenshtein(pre_lines, post_lines).compute()
|
|
line_ops = hunk_levenshtein["ops"]
|
|
|
|
line_mvt = LineMovement(absolute=startline_target)
|
|
|
|
for op, positions, values, cost in line_ops:
|
|
if op == "L":
|
|
self.log("LEAVE {} -- L{}".format(values[0].strip(), line_mvt.absolute))
|
|
line_mvt += 1
|
|
else:
|
|
line_mvt.do(self.tmux_session)
|
|
if op == "I":
|
|
self.log("INSERT {}".format(values.strip()))
|
|
self.write_line(values)
|
|
line_mvt += 1
|
|
elif op == "D":
|
|
self.log("DELETE {}".format(values.strip()))
|
|
self.tmux_session.type_keys("dd")
|
|
elif op == "S":
|
|
self.log("SUBST {}/{}".format(values[0].strip(), values[1].strip()))
|
|
self.subst_line(values[0].rstrip(), values[1].rstrip())
|
|
line_mvt += 1
|
|
|
|
def quit(self):
|
|
self.set_mode("command")
|
|
self.tmux_session.type_keys(":qa!", "enter")
|