Merge remote-tracking branch 'upstream/main'
This commit is contained in:
@@ -53,7 +53,8 @@ from .docs import (
|
||||
compare_docx_files_and_ignore_new_lines,
|
||||
compare_docx_images,
|
||||
compare_image_text,
|
||||
compare_references
|
||||
compare_references,
|
||||
compare_unique_train_records
|
||||
)
|
||||
from .general import (
|
||||
check_csv,
|
||||
|
||||
@@ -167,8 +167,12 @@ def compare_docx_files(file1, file2, **options):
|
||||
if ignore_case:
|
||||
p1, p2 = p1.lower(), p2.lower()
|
||||
if p1 != p2:
|
||||
print(p1)
|
||||
print(p2)
|
||||
# show the difference
|
||||
print("=== First Paragraph ===")
|
||||
print(f"\033[92m{repr(p1)}\033[0m") # Green color for p1, repr() shows hidden chars
|
||||
print("=== Second Paragraph ===")
|
||||
print(f"\033[91m{repr(p2)}\033[0m") # Red color for p2, repr() shows hidden chars
|
||||
print("=" * 50) # Clear boundary
|
||||
return 0
|
||||
|
||||
return 1
|
||||
@@ -886,3 +890,72 @@ def compare_references(file1, file2, **options):
|
||||
return (result - reference_base_result) / (1 - reference_base_result)
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
def compare_unique_train_records(processed_file, expected_files, **kwargs):
|
||||
"""
|
||||
Compares the processed file with a list of expected files containing the
|
||||
gold standard and the initial document.
|
||||
expected_files[0] should be the gold standard file.
|
||||
expected_files[1] should be the initial file.
|
||||
"""
|
||||
# Debug logging to understand what we're actually receiving
|
||||
logger.info(f"DEBUG: processed_file type: {type(processed_file)}, value: {processed_file}")
|
||||
logger.info(f"DEBUG: expected_files type: {type(expected_files)}, value: {expected_files}")
|
||||
logger.info(f"DEBUG: kwargs: {kwargs}")
|
||||
|
||||
if not processed_file or not isinstance(expected_files, list) or len(expected_files) < 2:
|
||||
logger.error("Invalid arguments: processed_file and a list of 2 expected_files are required.")
|
||||
return 0
|
||||
|
||||
gold_file = expected_files[0]
|
||||
initial_file = expected_files[1]
|
||||
|
||||
if not gold_file or not initial_file:
|
||||
logger.error("Gold file or initial file path is missing from expected_files list.")
|
||||
return 0
|
||||
|
||||
# Helper function to get lines and IDs from a file
|
||||
def get_lines_and_ids_from_file(file_path):
|
||||
try:
|
||||
doc = Document(file_path)
|
||||
lines = [p.text.strip() for p in doc.paragraphs if p.text.strip()]
|
||||
train_ids = [line.split(',')[1].strip() for line in lines if len(line.split(',')) == 4]
|
||||
return lines, train_ids
|
||||
except Exception as e:
|
||||
logger.error(f"Error opening or parsing file {file_path}: {e}")
|
||||
return None, None
|
||||
|
||||
# Get data from all three files
|
||||
processed_lines, processed_train_ids = get_lines_and_ids_from_file(processed_file)
|
||||
if processed_lines is None: return 0
|
||||
|
||||
gold_lines, gold_train_ids = get_lines_and_ids_from_file(gold_file)
|
||||
if gold_lines is None: return 0
|
||||
|
||||
initial_lines, _ = get_lines_and_ids_from_file(initial_file)
|
||||
if initial_lines is None: return 0
|
||||
initial_lines_set = set(initial_lines)
|
||||
|
||||
# 1. Subset Check: Ensure every processed line was in the initial file
|
||||
if not set(processed_lines).issubset(initial_lines_set):
|
||||
logger.error("Processed file contains lines not present in the initial file.")
|
||||
logger.error(f"Extra lines: {set(processed_lines) - initial_lines_set}")
|
||||
return 0
|
||||
|
||||
# 2. Uniqueness Check: Check for duplicates within the processed file
|
||||
if len(processed_train_ids) != len(set(processed_train_ids)):
|
||||
logger.error("Duplicate train_ids found in the processed file.")
|
||||
return 0
|
||||
|
||||
# 3. Correctness Check: Compare the set of train_ids
|
||||
if set(processed_train_ids) != set(gold_train_ids):
|
||||
logger.error("Set of train_ids does not match between processed file and gold file.")
|
||||
return 0
|
||||
|
||||
# 4. Line count check
|
||||
if len(processed_lines) != len(gold_lines):
|
||||
logger.error("Number of lines does not match between processed file and gold file.")
|
||||
return 0
|
||||
|
||||
return 1
|
||||
|
||||
@@ -3,6 +3,7 @@ import os
|
||||
import subprocess
|
||||
from typing import Dict
|
||||
from xml.etree import ElementTree
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import acoustid
|
||||
import cv2
|
||||
@@ -26,15 +27,108 @@ def is_vlc_playing(actual_status_path: str, rule: Dict[str, str]) -> float:
|
||||
|
||||
tree = ElementTree.fromstring(actual_status)
|
||||
status = tree.find('state').text
|
||||
logger.info(f"VLC Status: {status}")
|
||||
if status == 'playing':
|
||||
if rule['type'] == 'file_name':
|
||||
file_info = tree.find('information/category[@name="meta"]/info[@name="filename"]').text
|
||||
# Try multiple possible paths for file information in VLC XML
|
||||
file_paths = [
|
||||
'information/category[@name="meta"]/info[@name="filename"]',
|
||||
'information/category[@name="meta"]/info[@name="title"]',
|
||||
'information/category[@name="meta"]/info[@name="uri"]',
|
||||
'information/category[@name="meta"]/info[@name="location"]',
|
||||
'information/category[@name="meta"]/info[@name="name"]'
|
||||
]
|
||||
|
||||
file_info = None
|
||||
for path in file_paths:
|
||||
element = tree.find(path)
|
||||
if element is not None and element.text:
|
||||
file_info = element.text
|
||||
break
|
||||
|
||||
if file_info:
|
||||
return 1 if file_info.endswith(rule['file_name']) else 0
|
||||
expected_filename = rule['file_name']
|
||||
|
||||
# Method 1: Direct filename match (most precise)
|
||||
actual_basename = os.path.basename(file_info)
|
||||
if actual_basename == expected_filename:
|
||||
return 1
|
||||
|
||||
# Method 2: Endswith match (for backward compatibility)
|
||||
if file_info.endswith(expected_filename):
|
||||
return 1
|
||||
|
||||
# Method 3: For paths, check if expected filename is in the path
|
||||
if expected_filename in file_info:
|
||||
# Additional check to avoid false positives
|
||||
# Make sure it's actually the filename, not just part of a path
|
||||
if file_info.endswith('/' + expected_filename) or file_info.endswith('\\' + expected_filename):
|
||||
return 1
|
||||
|
||||
logger.warning(f"File name mismatch - Expected: {expected_filename}, Found: {file_info}")
|
||||
return 0
|
||||
else:
|
||||
logger.warning(f"Could not find file information in VLC status XML for rule: {rule}")
|
||||
return 0
|
||||
elif rule['type'] == 'url':
|
||||
file_info = tree.find('information/category[@name="meta"]/info[@name="url"]').text
|
||||
# Try multiple possible paths for URL information in VLC XML
|
||||
url_paths = [
|
||||
'information/category[@name="meta"]/info[@name="url"]',
|
||||
'information/category[@name="meta"]/info[@name="URI"]',
|
||||
'information/category[@name="meta"]/info[@name="location"]',
|
||||
'information/category[@name="meta"]/info[@name="title"]', # Sometimes URL is in title for streams
|
||||
'information/category[@name="meta"]/info[@name="filename"]', # Sometimes URL is in filename for streams
|
||||
'information/category[@name="Stream 0"]/info[@name="Codec"]', # Try stream info
|
||||
'information/category[@name="Stream 0"]/info[@name="Type"]',
|
||||
'information/category[@name="Stream 0"]/info[@name="Language"]'
|
||||
]
|
||||
|
||||
file_info = None
|
||||
logger.debug(f"Looking for URL: {rule['url']}")
|
||||
|
||||
for path in url_paths:
|
||||
element = tree.find(path)
|
||||
if element is not None and element.text:
|
||||
file_info = element.text
|
||||
logger.debug(f"Found URL info at '{path}': {file_info}")
|
||||
break
|
||||
|
||||
if file_info:
|
||||
return 1 if file_info.endswith(rule['url']) else 0
|
||||
# For URL comparison, check if the rule URL is contained in the file_info
|
||||
# This handles cases where VLC might show a longer or modified URL
|
||||
expected_url = rule['url']
|
||||
|
||||
# Method 1: Direct URL match
|
||||
if expected_url in file_info or file_info.endswith(expected_url):
|
||||
return 1
|
||||
|
||||
# Method 2: For HLS streams, VLC often shows just the filename instead of full URL
|
||||
# Check if the file_info matches the filename part of the expected URL
|
||||
try:
|
||||
expected_parsed = urlparse(expected_url)
|
||||
expected_filename = os.path.basename(expected_parsed.path)
|
||||
|
||||
# If VLC shows just the filename (common for HLS streams)
|
||||
if file_info == expected_filename:
|
||||
logger.info(f"URL filename match - Expected URL: {expected_url}, VLC shows filename: {file_info}")
|
||||
return 1
|
||||
|
||||
# Method 3: Check if both are URLs from the same domain and similar path
|
||||
if '://' in file_info: # file_info is also a URL
|
||||
actual_parsed = urlparse(file_info)
|
||||
# Same domain and similar path structure
|
||||
if (expected_parsed.netloc == actual_parsed.netloc and
|
||||
expected_parsed.path in actual_parsed.path):
|
||||
return 1
|
||||
except Exception as e:
|
||||
logger.debug(f"URL parsing error: {e}")
|
||||
pass
|
||||
|
||||
logger.warning(f"URL mismatch - Expected: {expected_url}, Found: {file_info}")
|
||||
return 0
|
||||
else:
|
||||
logger.warning(f"Could not find URL information in VLC status XML for rule: {rule}")
|
||||
return 0
|
||||
else:
|
||||
logger.error(f"Unknown type: {rule['type']}")
|
||||
return 0
|
||||
@@ -130,33 +224,67 @@ def compare_audios(audio_path_1, audio_path_2):
|
||||
Compare two audio files and return a similarity score in the range [0, 1].
|
||||
audio_path_1, audio_path_2: paths to the audio files to compare
|
||||
"""
|
||||
# Example Usage:
|
||||
# similarity = compare_audios_simple('path_to_audio1.mp3', 'path_to_audio2.mp3')
|
||||
# print(f'Similarity Score: {similarity}')
|
||||
|
||||
# Convert to common format if necessary and load audio
|
||||
if not audio_path_1 or not audio_path_2:
|
||||
return 0
|
||||
|
||||
# Load the audio files and extract MFCC features
|
||||
y1, sr1 = librosa.load(audio_path_1)
|
||||
mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1)
|
||||
y1, y2 = None, None
|
||||
try:
|
||||
y1, sr1 = librosa.load(audio_path_1)
|
||||
except Exception:
|
||||
logger.warning(f"Could not load audio from {os.path.basename(audio_path_1)}. It might be empty or corrupt.")
|
||||
|
||||
y2, sr2 = librosa.load(audio_path_2)
|
||||
mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2)
|
||||
try:
|
||||
y2, sr2 = librosa.load(audio_path_2)
|
||||
except Exception:
|
||||
logger.warning(f"Could not load audio from {os.path.basename(audio_path_2)}. It might be empty or corrupt.")
|
||||
|
||||
# Handle cases where one or both audio files are empty or corrupt.
|
||||
is_y1_bad = (y1 is None) or (y1.shape[0] == 0)
|
||||
is_y2_bad = (y2 is None) or (y2.shape[0] == 0)
|
||||
|
||||
if is_y1_bad and is_y2_bad:
|
||||
logger.info("Both audio files are empty or corrupt. Considering them perfectly similar.")
|
||||
return 1.0
|
||||
|
||||
if is_y1_bad or is_y2_bad:
|
||||
logger.warning(f"One audio file is empty/corrupt, the other is not. Similarity is 0.")
|
||||
return 0.0
|
||||
|
||||
try:
|
||||
logger.info(f"Audio 1 ({os.path.basename(audio_path_1)}): sr={sr1}, len={len(y1)}")
|
||||
logger.info(f"Audio 2 ({os.path.basename(audio_path_2)}): sr={sr2}, len={len(y2)}")
|
||||
|
||||
# Extract MFCC features
|
||||
mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1)
|
||||
mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2)
|
||||
except Exception as e:
|
||||
logger.error(f"Error during MFCC extraction: {e}")
|
||||
return 0.0
|
||||
|
||||
# Normalize the MFCC features
|
||||
mfcc1 = librosa.util.normalize(mfcc1, axis=1)
|
||||
mfcc2 = librosa.util.normalize(mfcc2, axis=1)
|
||||
logger.info(f"MFCCs normalized.")
|
||||
|
||||
# Define a lambda function to compute cosine distance
|
||||
dist_func = lambda x, y: cosine(x, y)
|
||||
|
||||
# Use the DTW algorithm to find the best alignment path
|
||||
distance, path = fastdtw(mfcc1.T, mfcc2.T, dist=dist_func)
|
||||
logger.info(f"DTW distance: {distance:.4f}, Path length: {len(path)}")
|
||||
|
||||
# Calculate the similarity score, here we use 1/(1+distance) to convert distance to a similarity score
|
||||
similarity = 1 / (1 + distance)
|
||||
# Normalize the DTW distance by the length of the alignment path.
|
||||
if len(path) == 0:
|
||||
normalized_distance = np.inf
|
||||
else:
|
||||
normalized_distance = distance / len(path)
|
||||
logger.info(f"Normalized DTW distance: {normalized_distance:.4f}")
|
||||
|
||||
# Convert the normalized distance to a similarity score using an exponential decay function.
|
||||
similarity = np.exp(-normalized_distance)
|
||||
|
||||
return similarity
|
||||
|
||||
@@ -204,32 +332,6 @@ def compare_videos(video_path1, video_path2, max_frames_to_check=100, threshold=
|
||||
return 1.
|
||||
|
||||
|
||||
def are_audio_files_similar(mp3_file_path, mp4_file_path):
|
||||
# Extract audio fingerprint from MP3 file
|
||||
mp3_fingerprint, mp3_duration = acoustid.fingerprint_file(mp3_file_path)
|
||||
|
||||
# Extract the audio stream from the MP4 file
|
||||
mp4_audio_path = os.path.splitext(mp4_file_path)[0] + '_extracted.mp3'
|
||||
try:
|
||||
subprocess.run(["ffmpeg", "-i", mp4_file_path, "-vn", "-ar", "44100", "-ac", "2", "-ab", "192k", "-f", "mp3",
|
||||
mp4_audio_path], check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"An error occurred during audio extraction from MP4: {e}")
|
||||
return 0.
|
||||
|
||||
# Extract audio fingerprint from the extracted audio
|
||||
mp4_fingerprint, mp4_duration = acoustid.fingerprint_file(mp4_audio_path)
|
||||
|
||||
# Clean up temporary extracted audio file
|
||||
os.remove(mp4_audio_path)
|
||||
|
||||
# Compare fingerprints (rudimentary comparison)
|
||||
if mp3_duration >= mp4_duration and mp3_fingerprint == mp4_fingerprint:
|
||||
return 1.
|
||||
|
||||
return 0.
|
||||
|
||||
|
||||
def check_qt_bgcone(actual_config_path, rule):
|
||||
with open(actual_config_path, 'rb') as file:
|
||||
config_file = file.read().decode('utf-8')
|
||||
|
||||
@@ -58,16 +58,20 @@ def check_json_settings(actual: str, expected: str, **options) -> float:
|
||||
if not actual:
|
||||
return 0.
|
||||
|
||||
with open(actual, 'r') as f:
|
||||
data = json.load(f)
|
||||
try:
|
||||
with open(actual, 'r') as f:
|
||||
data = json.load(f)
|
||||
except Exception as e:
|
||||
return 0.0
|
||||
|
||||
expect = expected['expected']
|
||||
data_copy = copy.deepcopy(data)
|
||||
data_copy.update(expect)
|
||||
if data == data_copy:
|
||||
return 1.0
|
||||
else:
|
||||
return 0.0
|
||||
|
||||
# Check if all expected key-value pairs are in the actual data
|
||||
for key, value in expect.items():
|
||||
if key not in data or data[key] != value:
|
||||
return 0.0
|
||||
|
||||
return 1.0
|
||||
|
||||
|
||||
def compare_text_file(actual: str, expected: str, **options) -> float:
|
||||
|
||||
Reference in New Issue
Block a user