make monkey driver and farmer python files pylint and pep8 clean

This commit is contained in:
Vincent Sanders 2019-07-04 22:44:58 +01:00
parent e01bbee405
commit ccf336bb4a
2 changed files with 259 additions and 141 deletions

View File

@ -20,10 +20,16 @@
runs tests in monkey as defined in a yaml file
"""
import sys, getopt, yaml, time
# pylint: disable=locally-disabled, missing-docstring
import sys
import getopt
import time
import yaml
from monkeyfarmer import Browser
class DriverBrowser(Browser):
def __init__(self, *args, **kwargs):
super(DriverBrowser, self).__init__(*args, **kwargs)
@ -35,10 +41,12 @@ class DriverBrowser(Browser):
def remove_auth(self, url, realm, username, password):
keep = []
def matches(a,b):
if a is None or b is None:
def matches(first, second):
if first is None or second is None:
return True
return a == b
return first == second
for (iurl, irealm, iusername, ipassword) in self.auth:
if not (matches(url, iurl) or
matches(realm, irealm) or
@ -52,10 +60,10 @@ class DriverBrowser(Browser):
# We must logwin.send_{username,password}(xxx)
# We may logwin.go()
# We may logwin.destroy()
def matches(a,b):
if a is None or b is None:
def matches(first, second):
if first is None or second is None:
return True
return a == b
return first == second
candidates = []
for (url, realm, username, password) in self.auth:
score = 0
@ -84,20 +92,24 @@ class DriverBrowser(Browser):
def remove_cert(self, url):
keep = []
def matches(a,b):
if a is None or b is None:
def matches(first, second):
if first is None or second is None:
return True
return a == b
return first == second
for iurl in self.cert:
if not (matches(url, iurl)):
if not matches(url, iurl):
keep.append(iurl)
self.cert = keep
def handle_ready_sslcert(self, cwin):
def matches(a,b):
if a is None or b is None:
def matches(first, second):
if first is None or second is None:
return True
return a == b
return first == second
candidates = []
for url in self.cert:
score = 0
@ -114,16 +126,21 @@ class DriverBrowser(Browser):
print("SSLCert: No candidate found, cancelling sslcert box")
cwin.destroy()
def print_usage():
print('Usage:')
print(' ' + sys.argv[0] + ' -m <path to monkey> -t <path to test>')
print(' ' + sys.argv[0] + ' -m <path to monkey> -t <path to test> [-w <wrapper arguments>]')
def parse_argv(argv):
# pylint: disable=locally-disabled, unused-variable
path_monkey = ''
path_test = ''
wrapper = None
try:
opts, args = getopt.getopt(argv,"hm:t:w:",["monkey=","test=","wrapper="])
opts, args = getopt.getopt(argv, "hm:t:w:", ["monkey=", "test=", "wrapper="])
except getopt.GetoptError:
print_usage()
sys.exit(2)
@ -149,24 +166,35 @@ def parse_argv(argv):
return path_monkey, path_test, wrapper
def load_test_plan(path):
# pylint: disable=locally-disabled, broad-except
plan = []
with open(path, 'r') as stream:
try:
plan = (yaml.load(stream))
except:
print (exc)
except Exception as exc:
print(exc)
return plan
def get_indent(ctx):
return ' ' * ctx["depth"];
return ' ' * ctx["depth"]
def print_test_plan_info(ctx, plan):
# pylint: disable=locally-disabled, unused-argument
print('Running test: [' + plan["group"] + '] ' + plan["title"])
def assert_browser(ctx):
assert(ctx['browser'].started)
assert(not ctx['browser'].stopped)
assert ctx['browser'].started
assert not ctx['browser'].stopped
def conds_met(ctx, conds):
# for each condition listed determine if they have been met
@ -176,14 +204,14 @@ def conds_met(ctx, conds):
timer = cond['timer']
elapsed = cond['elapsed']
assert_browser(ctx)
assert(ctx['timers'].get(timer) is not None)
assert ctx['timers'].get(timer) is not None
taken = time.time() - ctx['timers'][timer]["start"]
if (taken >= elapsed):
if taken >= elapsed:
return True
elif 'window' in cond.keys():
status = cond['status']
window = cond['window']
assert(status == "complete") # TODO: Add more status support?
assert status == "complete" # TODO: Add more status support?
if window == "*all*":
# all windows must be not throbbing
throbbing = False
@ -201,11 +229,15 @@ def conds_met(ctx, conds):
return False
def run_test_step_action_launch(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
assert(ctx.get('browser') is None)
assert(ctx.get('windows') is None)
ctx['browser'] = DriverBrowser(monkey_cmd=[ctx["monkey"]], quiet=True, wrapper=ctx.get("wrapper"))
assert ctx.get('browser') is None
assert ctx.get('windows') is None
ctx['browser'] = DriverBrowser(
monkey_cmd=[ctx["monkey"]],
quiet=True,
wrapper=ctx.get("wrapper"))
assert_browser(ctx)
ctx['windows'] = dict()
for option in step.get('options', []):
@ -214,21 +246,29 @@ def run_test_step_action_launch(ctx, step):
def run_test_step_action_window_new(ctx, step):
# pylint: disable=locally-disabled, invalid-name
print(get_indent(ctx) + "Action: " + step["action"])
tag = step['tag']
assert_browser(ctx)
assert(ctx['windows'].get(tag) is None)
assert ctx['windows'].get(tag) is None
ctx['windows'][tag] = ctx['browser'].new_window(url=step.get('url'))
def run_test_step_action_window_close(ctx, step):
# pylint: disable=locally-disabled, invalid-name
print(get_indent(ctx) + "Action: " + step["action"])
assert_browser(ctx)
tag = step['window']
assert(ctx['windows'].get(tag) is not None)
assert ctx['windows'].get(tag) is not None
win = ctx['windows'].pop(tag)
win.kill()
win.wait_until_dead()
assert(win.alive == False)
assert not win.alive
def run_test_step_action_navigate(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
@ -237,26 +277,28 @@ def run_test_step_action_navigate(ctx, step):
url = step['url']
elif 'repeaturl' in step.keys():
repeat = ctx['repeats'].get(step['repeaturl'])
assert(repeat is not None)
assert(repeat.get('values') is not None)
assert repeat is not None
assert repeat.get('values') is not None
url = repeat['values'][repeat['i']]
else:
url = None
assert(url is not None)
assert url is not None
tag = step['window']
print(get_indent(ctx) + " " + tag + " --> " + url)
win = ctx['windows'].get(tag)
assert(win is not None)
assert win is not None
win.go(url)
def run_test_step_action_stop(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
assert_browser(ctx)
tag = step['window']
win = ctx['windows'].get(tag)
assert(win is not None)
assert win is not None
win.stop()
def run_test_step_action_sleep_ms(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
conds = step['conditions']
@ -264,7 +306,7 @@ def run_test_step_action_sleep_ms(ctx, step):
sleep = 0
have_repeat = False
if isinstance(sleep_time, str):
assert(ctx['repeats'].get(sleep_time) is not None)
assert ctx['repeats'].get(sleep_time) is not None
repeat = ctx['repeats'].get(sleep_time)
sleep = repeat["i"] / 1000
start = repeat["start"]
@ -286,19 +328,21 @@ def run_test_step_action_sleep_ms(ctx, step):
else:
ctx['browser'].farmer.loop(once=True)
def run_test_step_action_block(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
conds = step['conditions']
assert_browser(ctx)
while not conds_met(ctx, conds):
ctx['browser'].farmer.loop(once=True)
def run_test_step_action_repeat(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
tag = step['tag']
assert(ctx['repeats'].get(tag) is None)
ctx['repeats'][tag] = { "loop": True, }
assert ctx['repeats'].get(tag) is None
ctx['repeats'][tag] = {"loop": True, }
if 'min' in step.keys():
ctx['repeats'][tag]["i"] = step["min"]
@ -318,14 +362,15 @@ def run_test_step_action_repeat(ctx, step):
while ctx['repeats'][tag]["loop"]:
ctx['repeats'][tag]["start"] = time.time()
ctx["depth"] += 1
for s in step["steps"]:
run_test_step(ctx, s)
for stp in step["steps"]:
run_test_step(ctx, stp)
ctx['repeats'][tag]["i"] += ctx['repeats'][tag]["step"]
if ctx['repeats'][tag]['values'] is not None:
if ctx['repeats'][tag]["i"] >= len(ctx['repeats'][tag]['values']):
ctx['repeats'][tag]["loop"] = False
ctx["depth"] -= 1
def run_test_step_action_plot_check(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
assert_browser(ctx)
@ -334,69 +379,83 @@ def run_test_step_action_plot_check(ctx, step):
checks = step['checks']
else:
checks = {}
all_text = []
all_text_list = []
bitmaps = []
for plot in win.redraw():
if plot[0] == 'TEXT':
all_text.extend(plot[6:])
all_text_list.extend(plot[6:])
if plot[0] == 'BITMAP':
bitmaps.append(plot[1:])
all_text = " ".join(all_text)
all_text = " ".join(all_text_list)
for check in checks:
if 'text-contains' in check.keys():
print("Check {} in {}".format(repr(check['text-contains']),repr(all_text)))
assert(check['text-contains'] in all_text)
print("Check {} in {}".format(repr(check['text-contains']), repr(all_text)))
assert check['text-contains'] in all_text
elif 'text-not-contains' in check.keys():
print("Check {} NOT in {}".format(repr(check['text-not-contains']),repr(all_text)))
assert(check['text-not-contains'] not in all_text)
print("Check {} NOT in {}".format(repr(check['text-not-contains']), repr(all_text)))
assert check['text-not-contains'] not in all_text
elif 'bitmap-count' in check.keys():
print("Check bitmap count is {}".format(int(check['bitmap-count'])))
assert(len(bitmaps) == int(check['bitmap-count']))
assert len(bitmaps) == int(check['bitmap-count'])
else:
raise AssertionError("Unknown check: {}".format(repr(check)))
def run_test_step_action_timer_start(ctx, step):
# pylint: disable=locally-disabled, invalid-name
print(get_indent(ctx) + "Action: " + step["action"])
tag = step['timer']
assert_browser(ctx)
assert(ctx['timers'].get(tag) is None)
assert ctx['timers'].get(tag) is None
ctx['timers'][tag] = {}
ctx['timers'][tag]["start"] = time.time()
def run_test_step_action_timer_restart(ctx, step):
# pylint: disable=locally-disabled, invalid-name
print(get_indent(ctx) + "Action: " + step["action"])
timer = step['timer']
assert_browser(ctx)
assert(ctx['timers'].get(timer) is not None)
assert ctx['timers'].get(timer) is not None
taken = time.time() - ctx['timers'][timer]["start"]
print("{} {} restarted at: {:.2f}s".format(get_indent(ctx), timer, taken))
ctx['timers'][timer]["taken"] = taken
ctx['timers'][timer]["start"] = time.time()
def run_test_step_action_timer_stop(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
timer = step['timer']
assert_browser(ctx)
assert(ctx['timers'].get(timer) is not None)
assert ctx['timers'].get(timer) is not None
taken = time.time() - ctx['timers'][timer]["start"]
print("{} {} took: {:.2f}s".format(get_indent(ctx), timer, taken))
ctx['timers'][timer]["taken"] = taken
def run_test_step_action_timer_check(ctx, step):
# pylint: disable=locally-disabled, invalid-name
print(get_indent(ctx) + "Action: " + step["action"])
condition = step["condition"].split()
assert(len(condition) == 3)
assert len(condition) == 3
timer1 = ctx['timers'].get(condition[0])
timer2 = ctx['timers'].get(condition[2])
assert(timer1 is not None)
assert(timer2 is not None)
assert(timer1["taken"] is not None)
assert(timer2["taken"] is not None)
assert(condition[1] in ('<', '>'))
assert timer1 is not None
assert timer2 is not None
assert timer1["taken"] is not None
assert timer2["taken"] is not None
assert condition[1] in ('<', '>')
if condition[1] == '<':
assert(timer1["taken"] < timer2["taken"])
assert timer1["taken"] < timer2["taken"]
elif condition[1] == '>':
assert(timer1["taken"] > timer2["taken"])
assert timer1["taken"] > timer2["taken"]
def run_test_step_action_add_auth(ctx, step):
print(get_indent(ctx) + "Action:" + step["action"])
@ -407,12 +466,16 @@ def run_test_step_action_add_auth(ctx, step):
def run_test_step_action_remove_auth(ctx, step):
# pylint: disable=locally-disabled, invalid-name
print(get_indent(ctx) + "Action:" + step["action"])
assert_browser(ctx)
browser = ctx['browser']
browser.remove_auth(step.get("url"), step.get("realm"),
step.get("username"), step.get("password"))
def run_test_step_action_add_cert(ctx, step):
print(get_indent(ctx) + "Action:" + step["action"])
assert_browser(ctx)
@ -421,6 +484,9 @@ def run_test_step_action_add_cert(ctx, step):
def run_test_step_action_remove_cert(ctx, step):
# pylint: disable=locally-disabled, invalid-name
print(get_indent(ctx) + "Action:" + step["action"])
assert_browser(ctx)
browser = ctx['browser']
@ -430,18 +496,16 @@ def run_test_step_action_remove_cert(ctx, step):
def run_test_step_action_clear_log(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
assert_browser(ctx)
browser = ctx['browser']
tag = step['window']
print(get_indent(ctx) + " " + tag + " Log cleared")
win = ctx['windows'].get(tag)
assert(win is not None)
assert win is not None
win.clear_log()
def run_test_step_action_wait_log(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
assert_browser(ctx)
browser = ctx['browser']
tag = step['window']
source = step.get('source')
foldable = step.get('foldable')
@ -449,19 +513,18 @@ def run_test_step_action_wait_log(ctx, step):
substr = step.get('substring')
print(get_indent(ctx) + " " + tag + " Wait for logging")
win = ctx['windows'].get(tag)
assert(win is not None)
assert win is not None
win.wait_for_log(source=source, foldable=foldable, level=level, substr=substr)
def run_test_step_action_js_exec(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
assert_browser(ctx)
browser = ctx['browser']
tag = step['window']
cmd = step['cmd']
print(get_indent(ctx) + " " + tag + " Run " + cmd)
win = ctx['windows'].get(tag)
assert(win is not None)
assert win is not None
win.js_exec(cmd)
@ -469,35 +532,37 @@ def run_test_step_action_quit(ctx, step):
print(get_indent(ctx) + "Action: " + step["action"])
assert_browser(ctx)
browser = ctx.pop('browser')
windows = ctx.pop('windows')
assert(browser.quit_and_wait())
assert browser.quit_and_wait()
step_handlers = {
"launch": run_test_step_action_launch,
"window-new": run_test_step_action_window_new,
"window-close": run_test_step_action_window_close,
"navigate": run_test_step_action_navigate,
"stop": run_test_step_action_stop,
"sleep-ms": run_test_step_action_sleep_ms,
"block": run_test_step_action_block,
"repeat": run_test_step_action_repeat,
"timer-start": run_test_step_action_timer_start,
"timer-restart":run_test_step_action_timer_restart,
"timer-stop": run_test_step_action_timer_stop,
"timer-check": run_test_step_action_timer_check,
"plot-check": run_test_step_action_plot_check,
"add-auth": run_test_step_action_add_auth,
"remove-auth": run_test_step_action_remove_auth,
"add-cert": run_test_step_action_add_cert,
"remove-cert": run_test_step_action_remove_cert,
"clear-log": run_test_step_action_clear_log,
"wait-log": run_test_step_action_wait_log,
"js-exec": run_test_step_action_js_exec,
"quit": run_test_step_action_quit,
STEP_HANDLERS = {
"launch": run_test_step_action_launch,
"window-new": run_test_step_action_window_new,
"window-close": run_test_step_action_window_close,
"navigate": run_test_step_action_navigate,
"stop": run_test_step_action_stop,
"sleep-ms": run_test_step_action_sleep_ms,
"block": run_test_step_action_block,
"repeat": run_test_step_action_repeat,
"timer-start": run_test_step_action_timer_start,
"timer-restart": run_test_step_action_timer_restart,
"timer-stop": run_test_step_action_timer_stop,
"timer-check": run_test_step_action_timer_check,
"plot-check": run_test_step_action_plot_check,
"add-auth": run_test_step_action_add_auth,
"remove-auth": run_test_step_action_remove_auth,
"add-cert": run_test_step_action_add_cert,
"remove-cert": run_test_step_action_remove_cert,
"clear-log": run_test_step_action_clear_log,
"wait-log": run_test_step_action_wait_log,
"js-exec": run_test_step_action_js_exec,
"quit": run_test_step_action_quit,
}
def run_test_step(ctx, step):
step_handlers[step["action"]](ctx, step)
STEP_HANDLERS[step["action"]](ctx, step)
def walk_test_plan(ctx, plan):
ctx["depth"] = 0
@ -506,16 +571,19 @@ def walk_test_plan(ctx, plan):
for step in plan["steps"]:
run_test_step(ctx, step)
def run_test_plan(ctx, plan):
print_test_plan_info(ctx, plan)
walk_test_plan(ctx, plan)
def run_preloaded_test(path_monkey, plan):
ctx = {
"monkey": path_monkey,
}
run_test_plan(ctx, plan)
def main(argv):
ctx = {}
path_monkey, path_test, wrapper = parse_argv(argv)
@ -524,6 +592,7 @@ def main(argv):
ctx["wrapper"] = wrapper
run_test_plan(ctx, plan)
# Some python weirdness to get to main().
if __name__ == "__main__":
main(sys.argv[1:])

View File

@ -24,16 +24,22 @@ Python code.
"""
# pylint: disable=locally-disabled, missing-docstring
import asyncore
import os
import socket
import subprocess
import time
class MonkeyFarmer(asyncore.dispatcher):
# pylint: disable=locally-disabled, too-many-instance-attributes
def __init__(self, monkey_cmd, online, quiet=False, *, wrapper=None):
(mine, monkeys) = socket.socketpair()
asyncore.dispatcher.__init__(self, sock=mine)
if wrapper is not None:
@ -65,7 +71,6 @@ class MonkeyFarmer(asyncore.dispatcher):
def handle_close(self):
# the pipe to the monkey process has closed
self.close()
def handle_read(self):
got = self.recv(8192)
@ -75,7 +80,8 @@ class MonkeyFarmer(asyncore.dispatcher):
if self.monkey.poll() is None:
self.monkey.terminate()
self.monkey.wait()
self.lines.insert(0, "GENERIC EXIT {}".format(self.monkey.returncode).encode('utf-8'))
self.lines.insert(0, "GENERIC EXIT {}".format(
self.monkey.returncode).encode('utf-8'))
return
self.incoming += got
if b"\n" in self.incoming:
@ -84,7 +90,7 @@ class MonkeyFarmer(asyncore.dispatcher):
self.lines = lines
def writable(self):
return (len(self.buffer) > 0)
return len(self.buffer) > 0
def handle_write(self):
sent = self.send(self.buffer)
@ -94,7 +100,7 @@ class MonkeyFarmer(asyncore.dispatcher):
cmd = (" ".join(args))
if not self.quiet:
print(">>> {}".format(cmd))
self.discussion.append((">",cmd))
self.discussion.append((">", cmd))
cmd = cmd + "\n"
self.buffer += cmd.encode('utf-8')
@ -106,15 +112,15 @@ class MonkeyFarmer(asyncore.dispatcher):
self.online(line)
def schedule_event(self, event, secs=None, when=None):
assert(secs is not None or when is not None)
assert secs is not None or when is not None
if when is None:
when = time.time() + secs
self.scheduled.append((when, event))
self.scheduled.sort(lambda a,b: cmp(a[0],b[0]))
self.scheduled.sort()
def unschedule_event(self, event):
self.scheduled = [x for x in self.scheduled if x[1] != event]
def loop(self, once=False):
if len(self.lines) > 0:
self.monkey_says(self.lines.pop(0))
@ -128,8 +134,8 @@ class MonkeyFarmer(asyncore.dispatcher):
func(self)
now = time.time()
if len(self.scheduled) > 0:
next = self.scheduled[0][0]
asyncore.loop(timeout=next-now, count=1)
next_event = self.scheduled[0][0]
asyncore.loop(timeout=next_event - now, count=1)
else:
asyncore.loop(count=1)
if len(self.lines) > 0:
@ -137,9 +143,17 @@ class MonkeyFarmer(asyncore.dispatcher):
if once:
break
class Browser:
# pylint: disable=locally-disabled, too-many-instance-attributes, dangerous-default-value, invalid-name
def __init__(self, monkey_cmd=["./nsmonkey"], quiet=False, *, wrapper=None):
self.farmer = MonkeyFarmer(monkey_cmd=monkey_cmd, online=self.on_monkey_line, quiet=quiet, wrapper=wrapper)
self.farmer = MonkeyFarmer(
monkey_cmd=monkey_cmd,
online=self.on_monkey_line,
quiet=quiet,
wrapper=wrapper)
self.windows = {}
self.logins = {}
self.sslcerts = {}
@ -161,7 +175,7 @@ class Browser:
def pass_options(self, *opts):
if len(opts) > 0:
self.farmer.tell_monkey("OPTIONS " + (" ".join(['--' + opt for opt in opts])))
def on_monkey_line(self, line):
parts = line.split(" ")
handler = getattr(self, "handle_" + parts[0], None)
@ -175,7 +189,7 @@ class Browser:
self.quit()
self.farmer.loop()
return self.stopped
def handle_GENERIC(self, what, *args):
if what == 'STARTED':
self.started = True
@ -186,9 +200,8 @@ class Browser:
elif what == 'EXIT':
if not self.stopped:
print("Unexpected exit of monkey process with code {}".format(args[0]))
assert(self.stopped)
assert self.stopped
else:
# TODO: Nothing for now?
pass
def handle_WINDOW(self, action, _win, winid, *args):
@ -230,7 +243,7 @@ class Browser:
def handle_PLOT(self, *args):
if self.current_draw_target is not None:
self.current_draw_target.handle_plot(*args)
def new_window(self, url=None):
if url is None:
self.farmer.tell_monkey("WINDOW NEW")
@ -243,18 +256,31 @@ class Browser:
return self.windows[poss_wins.pop()]
def handle_ready_login(self, lwin):
# pylint: disable=locally-disabled, no-self-use
# Override this method to do useful stuff
lwin.destroy()
def handle_ready_sslcert(self, cwin):
# pylint: disable=locally-disabled, no-self-use
# Override this method to do useful stuff
cwin.destroy()
class SSLCertWindow:
# pylint: disable=locally-disabled, invalid-name
def __init__(self, browser, winid, _url, *url):
self.alive = True
self.browser = browser
self.winid = winid
self.url = " ".join(url)
def handle(self, action, _str="STR", *rest):
content = " ".join(rest)
def handle(self, action, _str="STR"):
if action == "DESTROY":
self.alive = False
else:
@ -265,16 +291,20 @@ class SSLCertWindow:
self.browser.farmer.loop(once=True)
def go(self):
assert(self.alive)
assert self.alive
self.browser.farmer.tell_monkey("SSLCERT GO {}".format(self.winid))
self._wait_dead()
def destroy(self):
assert(self.alive)
assert self.alive
self.browser.farmer.tell_monkey("SSLCERT DESTROY {}".format(self.winid))
self._wait_dead()
class LoginWindow:
# pylint: disable=locally-disabled, too-many-instance-attributes, invalid-name
def __init__(self, browser, winid, _url, *url):
self.alive = True
self.ready = False
@ -301,13 +331,13 @@ class LoginWindow:
self.ready = True
def send_username(self, username=None):
assert(self.alive)
assert self.alive
if username is None:
username = self.username
self.browser.farmer.tell_monkey("LOGIN USERNAME {} {}".format(self.winid, username))
def send_password(self, password=None):
assert(self.alive)
assert self.alive
if password is None:
password = self.password
self.browser.farmer.tell_monkey("LOGIN PASSWORD {} {}".format(self.winid, password))
@ -315,19 +345,35 @@ class LoginWindow:
def _wait_dead(self):
while self.alive:
self.browser.farmer.loop(once=True)
def go(self):
assert(self.alive)
assert self.alive
self.browser.farmer.tell_monkey("LOGIN GO {}".format(self.winid))
self._wait_dead()
def destroy(self):
assert(self.alive)
assert self.alive
self.browser.farmer.tell_monkey("LOGIN DESTROY {}".format(self.winid))
self._wait_dead()
class BrowserWindow:
def __init__(self, browser, winid, _for, coreid, _existing, otherid, _newtab, newtab, _clone, clone):
# pylint: disable=locally-disabled, too-many-instance-attributes, too-many-public-methods, invalid-name
def __init__(
self,
browser,
winid,
_for,
coreid,
_existing,
otherid,
_newtab,
newtab,
_clone,
clone):
# pylint: disable=locally-disabled, too-many-arguments
self.alive = True
self.browser = browser
self.winid = winid
@ -361,7 +407,7 @@ class BrowserWindow:
if (time.time() - now) > timeout:
break
def go(self, url, referer = None):
def go(self, url, referer=None):
if referer is None:
self.browser.farmer.tell_monkey("WINDOW GO %s %s" % (
self.winid, url))
@ -387,15 +433,12 @@ class BrowserWindow:
def handle_window_SIZE(self, _width, width, _height, height):
self.width = int(width)
self.height = int(height)
def handle_window_DESTROY(self):
self.alive = False
def handle_window_TITLE(self, _str, *title):
self.title = " ".join(title)
def handle_window_REDRAW(self):
pass
def handle_window_GET_DIMENSIONS(self, _width, width, _height, height):
self.width = width
@ -418,11 +461,12 @@ class BrowserWindow:
self.scrolly = int(y)
def handle_window_UPDATE_BOX(self, _x, x, _y, y, _width, width, _height, height):
# pylint: disable=locally-disabled, no-self-use
x = int(x)
y = int(y)
width = int(width)
height = int(height)
pass
def handle_window_UPDATE_EXTENT(self, _width, width, _height, height):
self.content_width = int(width)
@ -516,9 +560,11 @@ class BrowserWindow:
self.browser.farmer.loop(once=True)
if __name__ == '__main__':
# Simple test is as follows...
def farmer_test():
'''
Simple farmer test
'''
browser = Browser(quiet=True)
win = browser.new_window()
@ -534,11 +580,10 @@ if __name__ == '__main__':
print("Received {} plot commands".format(len(cmds)))
for cmd in cmds:
if cmd[0] == "TEXT":
x = cmd[2]
y = cmd[4]
text_x = cmd[2]
text_y = cmd[4]
rest = " ".join(cmd[6:])
print("{} {} -> {}".format(x,y,rest))
print("{} {} -> {}".format(text_x, text_y, rest))
browser.pass_options("--enable_javascript=1")
win.load_page("file://" + full_fname)
@ -549,10 +594,10 @@ if __name__ == '__main__':
print("Received {} plot commands".format(len(cmds)))
for cmd in cmds:
if cmd[0] == "TEXT":
x = cmd[2]
y = cmd[4]
text_x = cmd[2]
text_y = cmd[4]
rest = " ".join(cmd[6:])
print("{} {} -> {}".format(x,y,rest))
print("{} {} -> {}".format(text_x, text_y, rest))
browser.quit_and_wait()
@ -565,17 +610,17 @@ if __name__ == '__main__':
def handle_ready_sslcert(self, cwin):
cwin.destroy()
browser = FooBarLogin(quiet=True)
win = browser.new_window()
fbbrowser = FooBarLogin(quiet=True)
win = fbbrowser.new_window()
win.load_page("https://httpbin.org/basic-auth/foo/bar")
cmds = win.redraw()
print("Received {} plot commands for auth test".format(len(cmds)))
for cmd in cmds:
if cmd[0] == "TEXT":
x = cmd[2]
y = cmd[4]
text_x = cmd[2]
text_y = cmd[4]
rest = " ".join(cmd[6:])
print("{} {} -> {}".format(x,y,rest))
print("{} {} -> {}".format(text_x, text_y, rest))
fname = "test/js/inserted-script.html"
full_fname = os.path.join(os.getcwd(), fname)
@ -588,6 +633,10 @@ if __name__ == '__main__':
win.wait_for_log(substr="deferred")
#print("Discussion was:")
#for line in browser.farmer.discussion:
# print("Discussion was:")
# for line in browser.farmer.discussion:
# print("{} {}".format(line[0], line[1]))
if __name__ == '__main__':
farmer_test()