add controller

This commit is contained in:
Jing Hua
2023-10-24 02:26:24 +08:00
parent 14a4f5c008
commit a7144824d2
6 changed files with 61 additions and 13 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
*.iso
.venv
__pycache__
*.png

12
README.md Normal file
View File

@@ -0,0 +1,12 @@
# Setup guide
1. Download kubuntu from https://kubuntu.org/getkubuntu/
2. Setup virtual machine
1. Create `Host Only Adapter` and add it to the network adapter in the settings
3. Install [xdotool](https://github.com/jordansissel/xdotool) on VM
4. Set up SSH server on VM: https://averagelinuxuser.com/ssh-into-virtualbox/
1. `sudo apt install openssh-server`
2. `sudo systemctl enable ssh --now`
3. `sudo ufw disable` (disable firewall - safe for local network, otherwise `sudo ufw allow ssh`)
4. `ip a` - find ip address
5. ssh username@<ip_address>

17
controller.py Normal file
View File

@@ -0,0 +1,17 @@
import numpy as np
from PIL import Image
from simulator import EmulatorSimulator
class Controller:
def __init__(self, vm_name: str, username: str, password: str, host: str) -> None:
self.simulator = EmulatorSimulator(vm_name=vm_name, username=username,
password=password, host=host)
def get_state(self) -> np.ndarray:
image_path = self.simulator.get_screenshot()
with Image.open(image_path) as img:
return np.array(img)
def step(self) -> None:
self.simulator.keyboard_type("hello word")

9
main.py Normal file
View File

@@ -0,0 +1,9 @@
from controller import Controller
controller = Controller(vm_name="KUbuntu-23.10", username="username", password="password", host="192.168.56.101")
input("enter to continue")
img = controller.get_state()
print(img)
input("enter to continue")
controller.step()

View File

@@ -1 +1,3 @@
numpy numpy
Pillow
fabric

View File

@@ -1,14 +1,15 @@
import numpy as np
import subprocess import subprocess
from fabric import Connection
class EmulatorSimulator: class EmulatorSimulator:
def __init__(self, vm_name: str, username: str, password: str, snapshot_name: str = "snapshot"): def __init__(self, vm_name: str, username: str, password: str, host:str, snapshot_name: str = "snapshot"):
self.vm_name = vm_name self.vm_name = vm_name
self.username = username self.username = username
self.password = password self.password = password
self.host = host
self.snapshot_name = snapshot_name self.snapshot_name = snapshot_name
self.execute_command(["VBoxManage", "startvm", self.vm_name]) self.ssh_connection = Connection(host=self.host, user=self.username, connect_kwargs={"password": password})
pass self._execute_command(["VBoxManage", "startvm", self.vm_name])
def _execute_command(self, command: list[str]) -> None: def _execute_command(self, command: list[str]) -> None:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -21,10 +22,8 @@ class EmulatorSimulator:
return stdout.decode() return stdout.decode()
def _execute_xdotool_command(self, command: list[str]) -> None: def _execute_xdotool_command(self, command: list[str]) -> None:
command_prefix = ["VBoxManage", "guestcontrol", self.vm_name, "run", result = self.ssh_connection.run(f"DISPLAY=:0 xdotool {command}", hide=True)
"--exe", "/usr/bin/xdotool", "--username", self.username, return result.stdout.strip()
"--password", self.password, "--"]
self._execute_command(command_prefix + command)
def get_logs(self) -> str: def get_logs(self) -> str:
pass pass
@@ -36,16 +35,21 @@ class EmulatorSimulator:
self._execute_command(["VBoxManage", "snapshot", self.vm_name, "take", self.snapshot_name]) self._execute_command(["VBoxManage", "snapshot", self.vm_name, "take", self.snapshot_name])
def send_click(self, click: int) -> None: def send_click(self, click: int) -> None:
self._execute_xdotool_command(["click", str(click)]) self._execute_xdotool_command(f"click {click}")
def send_key(self) -> None: def send_key(self) -> None:
pass pass
def mouse_move(self, x: int, y: int) -> None: def mouse_move(self, x: int, y: int) -> None:
self._execute_xdotool_command(["mousemove", str(x), str(y),]) self._execute_xdotool_command(f"mousemove {x} {y}")
def get_screenshot(self) -> np.ndarray: def keyboard_type(self, text: str) -> None:
self._execute_command(["VBoxManage", "controlvm", self.vm_name, "screenshotpng", "./"]) self._execute_xdotool_command(f"type {text}")
def get_screenshot(self) -> str:
image_path = "./screenshot.png"
self._execute_command(["VBoxManage", "controlvm", self.vm_name, "screenshotpng", image_path])
return image_path
def close(self) -> None: def close(self) -> None:
pass pass