summaryrefslogtreecommitdiffstats
path: root/test/strategy.py
blob: 8aa58151f6b89dd3e37c5c4fc98710186f4cd584 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import enum

import attr
import pytest
import subprocess
import os
import shutil
import sys

from labgrid import target_factory, step, driver
from labgrid.strategy import Strategy, StrategyError

class Status(enum.Enum):
    unknown = 0
    off = 1
    barebox = 2
    qemu_dry_run = 3
    qemu_interactive = 4

@target_factory.reg_driver
@attr.s(eq=False)
class BareboxTestStrategy(Strategy):
    """BareboxTestStrategy - Strategy to switch to barebox"""
    bindings = {
        "power": "PowerProtocol",
        "console": "ConsoleProtocol",
        "barebox": "BareboxDriver",
    }

    status = attr.ib(default=Status.unknown)
    qemu = None

    def __attrs_post_init__(self):
        super().__attrs_post_init__()
        if isinstance(self.console, driver.QEMUDriver):
            self.qemu = self.console
        self.patchtools()

    @step(args=['status'])
    def transition(self, status, *, step):
        if not isinstance(status, Status):
            status = Status[status]
        if status == Status.unknown:
            raise StrategyError("can not transition to {}".format(status))
        elif status == self.status:
            step.skip("nothing to do")
            return  # nothing to do
        elif status == Status.off:
            self.target.deactivate(self.console)
            self.target.activate(self.power)
            self.power.off()
        elif status == Status.barebox:
            self.transition(Status.off)  # pylint: disable=missing-kwoa
            self.target.activate(self.console)
            # cycle power
            self.power.cycle()
            # interrupt barebox
            self.target.activate(self.barebox)
        else:
            raise StrategyError(
                "no transition found from {} to {}".
                format(self.status, status)
            )
        self.status = status

    def force(self, state):
        self.transition(Status.off)  # pylint: disable=missing-kwoa

        if state == "qemu_dry_run" or state == "qemu_interactive":
            cmd = self.get_qemu_base_args()

            cmd.append("-serial")
            cmd.append("mon:stdio")
            cmd.append("-trace")
            cmd.append("file=/dev/null")

            with open("/dev/tty", "r+b", buffering=0) as tty:
                tty.write(bytes("running: \n{}\n".format(quote_cmd(cmd)), "UTF-8"))
                if state == "qemu_dry_run":
                    pytest.exit('Dry run. Terminating.')
                subprocess.run(cmd, stdin=tty, stdout=tty, stderr=tty)

            pytest.exit('Interactive session terminated')
        else:
            pytest.exit('Can only force to: qemu_dry_run, qemu_interactive')

    def get_qemu_base_args(self):
        if self.qemu is None:
            pytest.exit('interactive mode only supported with QEMUDriver')

        try:
            # https://github.com/labgrid-project/labgrid/pull/1212
            cmd = self.qemu.get_qemu_base_args()
        except AttributeError:
            self.qemu.on_activate()
            orig = self.qemu._cmd
            cmd = []

            list_iter = enumerate(orig)
            for i, opt in list_iter:
                if opt == "-S":
                    continue
                opt2 = double_opt(opt, orig, i)
                if (opt2.startswith("-chardev socket,id=serialsocket") or
                   opt2 == "-serial chardev:serialsocket" or
                   opt2 == "-qmp stdio"):
                    # skip over two elements at once
                    next(list_iter, None)
                    continue

                cmd.append(opt)

        return cmd

    def patchtools(self):
        # https://github.com/labgrid-project/labgrid/commit/69fd553c6969526b609d0be6bb81f0c35f08d1d0
        if self.qemu is None:
            return

        if 'tools' not in self.target.env.config.data:
            self.target.env.config.data['tools'] = {}
        self.target.env.config.data["tools"][self.qemu.qemu_bin] = \
                shutil.which(self.qemu.qemu_bin)

    def append_qemu_args(self, *args):
        if self.qemu is None:
            pytest.exit('Qemu option supplied for non-Qemu target')
        for arg in args:
            self.console.extra_args += " " + arg

def quote_cmd(cmd):
    quoted = map(lambda s : s if s.find(" ") == -1 else "'" + s + "'", cmd)
    return " ".join(quoted)

def double_opt(opt, orig, i):
    if opt == orig[-1]:
        return opt
    return " ".join([opt, orig[i + 1]])