Files
sci-gui-agent-benchmark/annotation/ducktrack/obs_client.py

200 lines
7.3 KiB
Python

import os
import subprocess
import time
from platform import system
import obsws_python as obs
import psutil
def is_obs_running() -> bool:
try:
for process in psutil.process_iter(attrs=["pid", "name"]):
if "obs" in process.info["name"].lower():
return True
return False
except:
raise Exception("Could not check if OBS is running already. Please check manually.")
def close_obs(obs_process: subprocess.Popen):
if obs_process:
obs_process.terminate()
try:
obs_process.wait(timeout=5)
except subprocess.TimeoutExpired:
obs_process.kill()
def find_obs() -> str:
common_paths = {
"Windows": [
"C:\\Program Files\\obs-studio\\bin\\64bit\\obs64.exe",
"C:\\Program Files (x86)\\obs-studio\\bin\\32bit\\obs32.exe"
],
"Darwin": [
"/Applications/OBS.app/Contents/MacOS/OBS",
"/opt/homebrew/bin/obs"
],
"Linux": [
"/usr/bin/obs",
"/usr/local/bin/obs"
]
}
for path in common_paths.get(system(), []):
if os.path.exists(path):
return path
try:
if system() == "Windows":
obs_path = subprocess.check_output("where obs", shell=True).decode().strip()
else:
obs_path = subprocess.check_output("which obs", shell=True).decode().strip()
if os.path.exists(obs_path):
return obs_path
except subprocess.CalledProcessError:
pass
return "obs"
def open_obs() -> subprocess.Popen:
try:
obs_path = find_obs()
if system() == "Windows":
# you have to change the working directory first for OBS to find the correct locale on windows
os.chdir(os.path.dirname(obs_path))
obs_path = os.path.basename(obs_path)
return subprocess.Popen([obs_path, "--startreplaybuffer", "--minimize-to-tray"])
except:
raise Exception("Failed to find OBS, please open OBS manually.")
class OBSClient:
"""
Controls the OBS client via the OBS websocket.
Sets all the correct settings for recording.
"""
def __init__(
self,
recording_path: str,
metadata: dict,
fps=30,
output_width=1280,
output_height=720,
):
self.metadata = metadata
self.req_client = obs.ReqClient()
self.event_client = obs.EventClient()
self.record_state_events = {}
def on_record_state_changed(data):
output_state = data.output_state
print("record state changed:", output_state)
if output_state not in self.record_state_events:
self.record_state_events[output_state] = []
self.record_state_events[output_state].append(time.perf_counter())
self.event_client.callback.register(on_record_state_changed)
self.old_profile = self.req_client.get_profile_list().current_profile_name
if "computer_tracker" not in self.req_client.get_profile_list().profiles:
self.req_client.create_profile("computer_tracker")
else:
self.req_client.set_current_profile("computer_tracker")
self.req_client.create_profile("temp")
self.req_client.remove_profile("temp")
self.req_client.set_current_profile("computer_tracker")
base_width = metadata["screen_width"]
base_height = metadata["screen_height"]
if metadata["system"] == "Darwin":
# for retina displays
# TODO: check if external displays are messed up by this
base_width *= 2
base_height *= 2
scaled_width, scaled_height = _scale_resolution(base_width, base_height, output_width, output_height)
self.req_client.set_profile_parameter("Video", "BaseCX", str(base_width))
self.req_client.set_profile_parameter("Video", "BaseCY", str(base_height))
self.req_client.set_profile_parameter("Video", "OutputCX", str(scaled_width))
self.req_client.set_profile_parameter("Video", "OutputCY", str(scaled_height))
self.req_client.set_profile_parameter("Video", "ScaleType", "lanczos")
self.req_client.set_profile_parameter("AdvOut", "RescaleRes", f"{base_width}x{base_height}")
self.req_client.set_profile_parameter("AdvOut", "RecRescaleRes", f"{base_width}x{base_height}")
self.req_client.set_profile_parameter("AdvOut", "FFRescaleRes", f"{base_width}x{base_height}")
self.req_client.set_profile_parameter("Video", "FPSCommon", str(fps))
self.req_client.set_profile_parameter("Video", "FPSInt", str(fps))
self.req_client.set_profile_parameter("Video", "FPSNum", str(fps))
self.req_client.set_profile_parameter("Video", "FPSDen", "1")
self.req_client.set_profile_parameter("SimpleOutput", "RecFormat2", "mp4")
bitrate = int(_get_bitrate_mbps(scaled_width, scaled_height, fps=fps) * 1000 / 50) * 50
self.req_client.set_profile_parameter("SimpleOutput", "VBitrate", str(bitrate))
# do this in order to get pause & resume
self.req_client.set_profile_parameter("SimpleOutput", "RecQuality", "Small")
self.req_client.set_profile_parameter("SimpleOutput", "FilePath", recording_path)
# TODO: not all OBS configs have this, maybe just instruct the user to mute themselves
try:
self.req_client.set_input_mute("Mic/Aux", muted=True)
except obs.error.OBSSDKRequestError :
# In case there is no Mic/Aux input, this will throw an error
pass
def start_recording(self):
self.req_client.start_record()
def stop_recording(self):
self.req_client.stop_record()
self.req_client.set_current_profile(self.old_profile) # restore old profile
def pause_recording(self):
self.req_client.pause_record()
def resume_recording(self):
self.req_client.resume_record()
def _get_bitrate_mbps(width: int, height: int, fps=30) -> float:
"""
Gets the YouTube recommended bitrate in Mbps for a given resolution and framerate.
Refer to https://support.google.com/youtube/answer/1722171?hl=en#zippy=%2Cbitrate
"""
resolutions = {
(7680, 4320): {30: 120, 60: 180},
(3840, 2160): {30: 40, 60: 60.5},
(2160, 1440): {30: 16, 60: 24},
(1920, 1080): {30: 8, 60: 12},
(1280, 720): {30: 5, 60: 7.5},
(640, 480): {30: 2.5, 60: 4},
(480, 360): {30: 1, 60: 1.5}
}
if (width, height) in resolutions:
return resolutions[(width, height)].get(fps)
else:
# approximate the bitrate using a simple linear model
area = width * height
multiplier = 3.5982188179592543e-06 if fps == 30 else 5.396175171097084e-06
constant = 2.418399836285939 if fps == 30 else 3.742780056500365
return multiplier * area + constant
def _scale_resolution(base_width: int, base_height: int, target_width: int, target_height: int) -> tuple[int, int]:
target_area = target_width * target_height
aspect_ratio = base_width / base_height
scaled_height = int((target_area / aspect_ratio) ** 0.5)
scaled_width = int(aspect_ratio * scaled_height)
return scaled_width, scaled_height