import os import subprocess import time from platform import system import obsws_python as obs import psutil def is_obs_running() -> bool: try: for process in psutil.process_iter(attrs=["pid", "name"]): if "obs" in process.info["name"].lower(): return True return False except: raise Exception("Could not check if OBS is running already. Please check manually.") def close_obs(obs_process: subprocess.Popen): if obs_process: obs_process.terminate() try: obs_process.wait(timeout=5) except subprocess.TimeoutExpired: obs_process.kill() def find_obs() -> str: common_paths = { "Windows": [ "C:\\Program Files\\obs-studio\\bin\\64bit\\obs64.exe", "C:\\Program Files (x86)\\obs-studio\\bin\\32bit\\obs32.exe" ], "Darwin": [ "/Applications/OBS.app/Contents/MacOS/OBS", "/opt/homebrew/bin/obs" ], "Linux": [ "/usr/bin/obs", "/usr/local/bin/obs" ] } for path in common_paths.get(system(), []): if os.path.exists(path): return path try: if system() == "Windows": obs_path = subprocess.check_output("where obs", shell=True).decode().strip() else: obs_path = subprocess.check_output("which obs", shell=True).decode().strip() if os.path.exists(obs_path): return obs_path except subprocess.CalledProcessError: pass return "obs" def open_obs() -> subprocess.Popen: try: obs_path = find_obs() if system() == "Windows": # you have to change the working directory first for OBS to find the correct locale on windows os.chdir(os.path.dirname(obs_path)) obs_path = os.path.basename(obs_path) return subprocess.Popen([obs_path, "--startreplaybuffer", "--minimize-to-tray"]) except: raise Exception("Failed to find OBS, please open OBS manually.") class OBSClient: """ Controls the OBS client via the OBS websocket. Sets all the correct settings for recording. """ def __init__( self, recording_path: str, metadata: dict, fps=30, output_width=1280, output_height=720, ): self.metadata = metadata self.req_client = obs.ReqClient() self.event_client = obs.EventClient() self.record_state_events = {} def on_record_state_changed(data): output_state = data.output_state print("record state changed:", output_state) if output_state not in self.record_state_events: self.record_state_events[output_state] = [] self.record_state_events[output_state].append(time.perf_counter()) self.event_client.callback.register(on_record_state_changed) self.old_profile = self.req_client.get_profile_list().current_profile_name if "computer_tracker" not in self.req_client.get_profile_list().profiles: self.req_client.create_profile("computer_tracker") else: self.req_client.set_current_profile("computer_tracker") self.req_client.create_profile("temp") self.req_client.remove_profile("temp") self.req_client.set_current_profile("computer_tracker") base_width = metadata["screen_width"] base_height = metadata["screen_height"] if metadata["system"] == "Darwin": # for retina displays # TODO: check if external displays are messed up by this base_width *= 2 base_height *= 2 scaled_width, scaled_height = _scale_resolution(base_width, base_height, output_width, output_height) self.req_client.set_profile_parameter("Video", "BaseCX", str(base_width)) self.req_client.set_profile_parameter("Video", "BaseCY", str(base_height)) self.req_client.set_profile_parameter("Video", "OutputCX", str(scaled_width)) self.req_client.set_profile_parameter("Video", "OutputCY", str(scaled_height)) self.req_client.set_profile_parameter("Video", "ScaleType", "lanczos") self.req_client.set_profile_parameter("AdvOut", "RescaleRes", f"{base_width}x{base_height}") self.req_client.set_profile_parameter("AdvOut", "RecRescaleRes", f"{base_width}x{base_height}") self.req_client.set_profile_parameter("AdvOut", "FFRescaleRes", f"{base_width}x{base_height}") self.req_client.set_profile_parameter("Video", "FPSCommon", str(fps)) self.req_client.set_profile_parameter("Video", "FPSInt", str(fps)) self.req_client.set_profile_parameter("Video", "FPSNum", str(fps)) self.req_client.set_profile_parameter("Video", "FPSDen", "1") self.req_client.set_profile_parameter("SimpleOutput", "RecFormat2", "mp4") bitrate = int(_get_bitrate_mbps(scaled_width, scaled_height, fps=fps) * 1000 / 50) * 50 self.req_client.set_profile_parameter("SimpleOutput", "VBitrate", str(bitrate)) # do this in order to get pause & resume self.req_client.set_profile_parameter("SimpleOutput", "RecQuality", "Small") self.req_client.set_profile_parameter("SimpleOutput", "FilePath", recording_path) # TODO: not all OBS configs have this, maybe just instruct the user to mute themselves try: self.req_client.set_input_mute("Mic/Aux", muted=True) except obs.error.OBSSDKRequestError : # In case there is no Mic/Aux input, this will throw an error pass def start_recording(self): self.req_client.start_record() def stop_recording(self): self.req_client.stop_record() self.req_client.set_current_profile(self.old_profile) # restore old profile def pause_recording(self): self.req_client.pause_record() def resume_recording(self): self.req_client.resume_record() def _get_bitrate_mbps(width: int, height: int, fps=30) -> float: """ Gets the YouTube recommended bitrate in Mbps for a given resolution and framerate. Refer to https://support.google.com/youtube/answer/1722171?hl=en#zippy=%2Cbitrate """ resolutions = { (7680, 4320): {30: 120, 60: 180}, (3840, 2160): {30: 40, 60: 60.5}, (2160, 1440): {30: 16, 60: 24}, (1920, 1080): {30: 8, 60: 12}, (1280, 720): {30: 5, 60: 7.5}, (640, 480): {30: 2.5, 60: 4}, (480, 360): {30: 1, 60: 1.5} } if (width, height) in resolutions: return resolutions[(width, height)].get(fps) else: # approximate the bitrate using a simple linear model area = width * height multiplier = 3.5982188179592543e-06 if fps == 30 else 5.396175171097084e-06 constant = 2.418399836285939 if fps == 30 else 3.742780056500365 return multiplier * area + constant def _scale_resolution(base_width: int, base_height: int, target_width: int, target_height: int) -> tuple[int, int]: target_area = target_width * target_height aspect_ratio = base_width / base_height scaled_height = int((target_area / aspect_ratio) ** 0.5) scaled_width = int(aspect_ratio * scaled_height) return scaled_width, scaled_height