import random import requests import logging import time from typing import List, Dict, Optional from dataclasses import dataclass from threading import Lock import json logger = logging.getLogger("desktopenv.providers.aws.ProxyPool") logger.setLevel(logging.INFO) @dataclass class ProxyInfo: host: str port: int username: Optional[str] = None password: Optional[str] = None protocol: str = "http" # http, https, socks5 failed_count: int = 0 last_used: float = 0 is_active: bool = True class ProxyPool: def __init__(self, config_file: str = None): self.proxies: List[ProxyInfo] = [] self.current_index = 0 self.lock = Lock() self.max_failures = 3 # 最大失败次数 self.cooldown_time = 300 # 5分钟冷却时间 if config_file: self.load_proxies_from_file(config_file) def load_proxies_from_file(self, config_file: str): """从配置文件加载代理列表""" try: with open(config_file, 'r') as f: proxy_configs = json.load(f) for config in proxy_configs: proxy = ProxyInfo( host=config['host'], port=config['port'], username=config.get('username'), password=config.get('password'), protocol=config.get('protocol', 'http') ) self.proxies.append(proxy) logger.info(f"Loaded {len(self.proxies)} proxies from {config_file}") except Exception as e: logger.error(f"Failed to load proxies from {config_file}: {e}") def add_proxy(self, host: str, port: int, username: str = None, password: str = None, protocol: str = "http"): """添加代理到池中""" proxy = ProxyInfo(host=host, port=port, username=username, password=password, protocol=protocol) with self.lock: self.proxies.append(proxy) logger.info(f"Added proxy {host}:{port}") def get_next_proxy(self) -> Optional[ProxyInfo]: """获取下一个可用的代理""" with self.lock: if not self.proxies: return None # 过滤掉失败次数过多的代理 active_proxies = [p for p in self.proxies if self._is_proxy_available(p)] if not active_proxies: logger.warning("No active proxies available") return None # 轮询选择代理 proxy = active_proxies[self.current_index % len(active_proxies)] self.current_index += 1 proxy.last_used = time.time() return proxy def _is_proxy_available(self, proxy: ProxyInfo) -> bool: """检查代理是否可用""" if not proxy.is_active: return False if proxy.failed_count >= self.max_failures: # 检查是否过了冷却时间 if time.time() - proxy.last_used < self.cooldown_time: return False else: # 重置失败计数 proxy.failed_count = 0 return True def mark_proxy_failed(self, proxy: ProxyInfo): """标记代理失败""" with self.lock: proxy.failed_count += 1 if proxy.failed_count >= self.max_failures: logger.warning(f"Proxy {proxy.host}:{proxy.port} marked as failed " f"(failures: {proxy.failed_count})") def mark_proxy_success(self, proxy: ProxyInfo): """标记代理成功""" with self.lock: proxy.failed_count = 0 def test_proxy(self, proxy: ProxyInfo, test_url: str = "http://httpbin.org/ip", timeout: int = 10) -> bool: """测试代理是否正常工作""" try: proxy_url = self._format_proxy_url(proxy) proxies = { 'http': proxy_url, 'https': proxy_url } response = requests.get(test_url, proxies=proxies, timeout=timeout) if response.status_code == 200: self.mark_proxy_success(proxy) return True else: self.mark_proxy_failed(proxy) return False except Exception as e: logger.debug(f"Proxy test failed for {proxy.host}:{proxy.port}: {e}") self.mark_proxy_failed(proxy) return False def _format_proxy_url(self, proxy: ProxyInfo) -> str: """格式化代理URL""" if proxy.username and proxy.password: return f"{proxy.protocol}://{proxy.username}:{proxy.password}@{proxy.host}:{proxy.port}" else: return f"{proxy.protocol}://{proxy.host}:{proxy.port}" def get_proxy_dict(self, proxy: ProxyInfo) -> Dict[str, str]: """获取requests库使用的代理字典""" proxy_url = self._format_proxy_url(proxy) return { 'http': proxy_url, 'https': proxy_url } def test_all_proxies(self, test_url: str = "http://httpbin.org/ip"): """测试所有代理""" logger.info("Testing all proxies...") working_count = 0 for proxy in self.proxies: if self.test_proxy(proxy, test_url): working_count += 1 logger.info(f"✓ Proxy {proxy.host}:{proxy.port} is working") else: logger.warning(f"✗ Proxy {proxy.host}:{proxy.port} failed") logger.info(f"Proxy test completed: {working_count}/{len(self.proxies)} working") return working_count def get_stats(self) -> Dict: """获取代理池统计信息""" with self.lock: total = len(self.proxies) active = len([p for p in self.proxies if self._is_proxy_available(p)]) failed = len([p for p in self.proxies if p.failed_count >= self.max_failures]) return { 'total': total, 'active': active, 'failed': failed, 'success_rate': active / total if total > 0 else 0 } # 全局代理池实例 _proxy_pool = None def get_global_proxy_pool() -> ProxyPool: """获取全局代理池实例""" global _proxy_pool if _proxy_pool is None: _proxy_pool = ProxyPool() return _proxy_pool def init_proxy_pool(config_file: str = None): """初始化全局代理池""" global _proxy_pool _proxy_pool = ProxyPool(config_file) return _proxy_pool