Python设置代理IP的常用方法与实践指南
在网络爬虫、API调用、数据采集等场景中,代理IP的使用已成为开发者必备技能。本文将深入探讨Python中各种代理设置方法,从基础概念到高级实践,助你轻松应对各种网络请求挑战。
01|代理IP基础概念与应用场景
什么是代理IP?
代理IP(Proxy IP)是介于客户端和目标服务器之间的中间服务器,它接收客户端的请求,然后转发给目标服务器,并将响应返回给客户端。在Python开发中,合理使用代理IP可以有效解决以下问题:
核心应用场景:
反爬虫规避:分散请求来源,降低被封IP风险
地理限制突破:访问特定地区的受限内容
匿名性保护:隐藏真实IP地址,保护隐私
负载均衡:分散请求到多个代理,提高稳定性
测试环境:模拟不同地区的用户访问
代理类型详解
graph TD
A[代理类型] --> B[HTTP代理]
A --> C[HTTPS代理]
A --> D[SOCKS代理]
B --> B1[适用于网页抓取]
B --> B2[速度快]
C --> C1[加密传输]
C --> C2[安全性高]
D --> D1[协议无关]
D --> D2[支持UDP]
02|requests库代理配置详解
requests库是Python中最常用的HTTP库之一,其代理配置简单直观。
基础代理设置
import requests
# 定义代理配置
proxies = {
'http': 'http://127.0.0.1:8080',
'https': 'https://127.0.0.1:8080'
}
# 使用代理发送请求
try:
response = requests.get('https://httpbin.org/ip', proxies=proxies, timeout=10)
print(f"通过代理访问,返回IP: {response.json()}")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
带认证的代理配置
import requests
# 需要用户名密码的代理
proxies_with_auth = {
'http': 'http://username:password@proxy.example.com:8080',
'https': 'https://username:password@proxy.example.com:8080'
}
response = requests.get('https://httpbin.org/ip', proxies=proxies_with_auth)
print(response.text)
高级配置技巧
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 创建会话并配置重试策略
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 代理配置
proxies = {
'http': 'http://127.0.0.1:8080',
'https': 'https://127.0.0.1:8080'
}
# 发送请求
response = session.get('https://httpbin.org/ip', proxies=proxies)
print(response.json())
TRAE IDE调试技巧:在TRAE IDE中,你可以使用内置的网络调试工具实时监控代理请求的状态码、响应时间和数据包大小,快速定位代理配置问题。
03|urllib库代理配置深度解析
虽然requests库更受欢迎,但urllib作为Python标准库,在某些场景下仍是首选。
urllib基础代理设置
import urllib.request
import urllib.error
# 创建代理处理器
proxy_handler = urllib.request.ProxyHandler({
'http': 'http://127.0.0.1:8080',
'https': 'https://127.0.0.1:8080'
})
# 创建opener
opener = urllib.request.build_opener(proxy_handler)
# 安装opener
urllib.request.install_opener(opener)
try:
# 使用代理发送请求
response = urllib.request.urlopen('https://httpbin.org/ip')
print(response.read().decode('utf-8'))
except urllib.error.URLError as e:
print(f"请求失败: {e}")
带认证的urllib代理
import urllib.request
import base64
# 创建密码管理器
password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
# 添加用户名和密码
proxy_url = 'proxy.example.com:8080'
username = 'your_username'
password = 'your_password'
password_mgr.add_password(None, proxy_url, username, password)
# 创建代理认证处理器
proxy_auth_handler = urllib.request.ProxyBasicAuthHandler(password_mgr)
# 创建并安装opener
opener = urllib.request.build_opener(proxy_auth_handler)
urllib.request.install_opener(opener)
# 发送请求
response = urllib.request.urlopen('https://httpbin.org/ip')
print(response.read().decode('utf-8'))
04|aiohttp异步代理配置
在现代Python应用中,异步编程越来越重要。aiohttp提供了强大的异步HTTP功能。
基础异步代理设置
import aiohttp
import asyncio
async def fetch_with_proxy():
# 代理配置
proxy = 'http://127.0.0.1:8080'
# 创建TCP连接器
connector = aiohttp.TCPConnector(ssl=False)
# 创建会话
async with aiohttp.ClientSession(connector=connector) as session:
try:
# 使用代理发送异步请求
async with session.get('https://httpbin.org/ip', proxy=proxy) as response:
data = await response.json()
print(f"通过代理访问,返回IP: {data}")
except aiohttp.ClientError as e:
print(f"请求失败: {e}")
# 运行异步函数
asyncio.run(fetch_with_proxy())
带认证的异步代理
import aiohttp
import asyncio
from aiohttp import BasicAuth
async def fetch_with_auth_proxy():
# 代理认证信息
proxy_auth = BasicAuth('username', 'password')
proxy = 'http://proxy.example.com:8080'
# 创建会话
async with aiohttp.ClientSession() as session:
try:
# 使用带认证的代理发送请求
async with session.get(
'https://httpbin.org/ip',
proxy=proxy,
proxy_auth=proxy_auth
) as response:
data = await response.json()
print(f"认证代理访问成功: {data}")
except aiohttp.ClientError as e:
print(f"请求失败: {e}")
asyncio.run(fetch_with_auth_proxy())
高级异步代理池管理
import aiohttp
import asyncio
import random
from typing import List
class AsyncProxyPool:
def __init__(self, proxies: List[str]):
self.proxies = proxies
self.failed_proxies = set()
def get_random_proxy(self) -> str:
available_proxies = [p for p in self.proxies if p not in self.failed_proxies]
if not available_proxies:
raise Exception("无可用的代理")
return random.choice(available_proxies)
def mark_proxy_failed(self, proxy: str):
self.failed_proxies.add(proxy)
print(f"代理 {proxy} 标记为失败")
async def fetch_with_retry(self, url: str, max_retries: int = 3):
for attempt in range(max_retries):
proxy = self.get_random_proxy()
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy=proxy, timeout=10) as response:
if response.status == 200:
return await response.text()
else:
print(f"代理 {proxy} 返回状态码: {response.status}")
except Exception as e:
print(f"代理 {proxy} 请求失败: {e}")
self.mark_proxy_failed(proxy)
raise Exception("所有代理都失败")
# 使用示例
async def main():
proxies = [
'http://127.0.0.1:8080',
'http://127.0.0.1:8081',
'http://127.0.0.1:8082'
]
proxy_pool = AsyncProxyPool(proxies)
try:
result = await proxy_pool.fetch_with_retry('https://httpbin.org/ip')
print("请求成功:", result[:100])
except Exception as e:
print("最终失败:", e)
asyncio.run(main())
05|Selenium WebDriver代理配置
对于需要模拟浏览器行为的场景,Selenium的代理配置尤为重要。
Chrome浏览器代理设置
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time
def setup_chrome_proxy():
# Chrome选项配置
chrome_options = Options()
# 代理设置
proxy = '127.0.0.1:8080'
chrome_options.add_argument(f'--proxy-server={proxy}')
# 其他常用选项
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
# 创建驱动
driver = webdriver.Chrome(options=chrome_options)
try:
# 访问测试页面
driver.get('https://httpbin.org/ip')
time.sleep(3)
# 获取页面内容
page_source = driver.page_source
print(f"页面内容: {page_source[:200]}")
finally:
driver.quit()
setup_chrome_proxy()
带认证的Selenium代理
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import zipfile
import os
def create_proxy_auth_extension(proxy_host, proxy_port, username, password):
"""创建带认证的代理扩展"""
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"],
"persistent": true
},
"minimum_chrome_version":"22.0.0"
}
"""
background_js = f"""
var config = {{
mode: "fixed_servers",
rules: {{
singleProxy: {{
scheme: "http",
host: "{proxy_host}",
port: parseInt({proxy_port})
}},
bypassList: ["localhost"]
}}
}};
chrome.proxy.settings.set({{value: config, scope: "regular"}}, function() {{}});
function callbackFn(details) {{
return {{
authCredentials: {{
username: "{username}",
password: "{password}"
}}
}};
}}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{{urls: ["
['blocking']
);
"""
# 创建扩展文件
pluginfile = 'proxy_auth_plugin.zip'
with zipfile.ZipFile(pluginfile, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
return pluginfile
def setup_auth_proxy():
# 代理信息
proxy_host = 'proxy.example.com'
proxy_port = 8080
username = 'your_username'
password = 'your_password'
# 创建认证扩展
pluginfile = create_proxy_auth_extension(proxy_host, proxy_port, username, password)
# Chrome选项
chrome_options = Options()
chrome_options.add_extension(pluginfile)
# 创建驱动
driver = webdriver.Chrome(options=chrome_options)
try:
driver.get('https://httpbin.org/ip')
time.sleep(3)
print(f"页面标题: {driver.title}")
finally:
driver.quit()
# 清理扩展文件
if os.path.exists(pluginfile):
os.remove(pluginfile)
setup_auth_proxy()
TRAE IDE调试优势:使用TRAE IDE的浏览器自动化调试功能,可以实时监控Selenium脚本的执行过程,查看每个步骤的截图和网络请求详情,快速定位代理配置问题。
06|代理IP验证与异常处理
有效的代理验证机制是确保爬虫稳定性的关键。
代理有效性验证
import requests
import asyncio
import aiohttp
from typing import List, Dict
import time
class ProxyValidator:
def __init__(self, test_url: str = 'https://httpbin.org/ip'):
self.test_url = test_url
self.timeout = 10
def validate_proxy(self, proxy: Dict[str, str]) -> Dict:
"""验证单个代理的有效性"""
result = {
'proxy': proxy,
'is_valid': False,
'response_time': 0,
'error': None
}
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies=proxy,
timeout=self.timeout
)
result['response_time'] = time.time() - start_time
if response.status_code == 200:
result['is_valid'] = True
result['response_data'] = response.json()
else:
result['error'] = f"HTTP状态码: {response.status_code}"
except requests.exceptions.ConnectTimeout:
result['error'] = "连接超时"
except requests.exceptions.ProxyError:
result['error'] = "代理错误"
except requests.exceptions.SSLError:
result['error'] = "SSL证书错误"
except Exception as e:
result['error'] = f"未知错误: {str(e)}"
return result
async def validate_proxy_async(self, proxy: Dict[str, str]) -> Dict:
"""异步验证代理"""
result = {
'proxy': proxy,
'is_valid': False,
'response_time': 0,
'error': None
}
try:
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(
self.test_url,
proxy=list(proxy.values())[0],
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
result['response_time'] = time.time() - start_time
if response.status == 200:
result['is_valid'] = True
result['response_data'] = await response.json()
else:
result['error'] = f"HTTP状态码: {response.status}"
except asyncio.TimeoutError:
result['error'] = "连接超时"
except Exception as e:
result['error'] = f"验证失败: {str(e)}"
return result
def validate_proxy_list(self, proxies: List[Dict[str, str]]) -> List[Dict]:
"""批量验证代理列表"""
results = []
for proxy in proxies:
result = self.validate_proxy(proxy)
results.append(result)
print(f"代理 {proxy} 验证结果: {'有效' if result['is_valid'] else '无效'}")
return results
async def validate_proxy_list_async(self, proxies: List[Dict[str, str]]) -> List[Dict]:
"""异步批量验证代理"""
tasks = [self.validate_proxy_async(proxy) for proxy in proxies]
results = await asyncio.gather(*tasks)
for result in results:
proxy = result['proxy']
print(f"代理 {proxy} 验证结果: {'有效' if result['is_valid'] else '无效'}")
return results
# 使用示例
validator = ProxyValidator()
# 测试代理列表
test_proxies = [
{'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},
{'http': 'http://invalid.proxy:8080', 'https': 'https://invalid.proxy:8080'}
]
# 同步验证
print("=== 同步验证 ===")
sync_results = validator.validate_proxy_list(test_proxies)
# 异步验证
print("\n=== 异步验证 ===")
asyncio.run(validator.validate_proxy_list_async(test_proxies))
智能异常处理机制
import requests
import time
import random
from typing import Optional, Callable
from functools import wraps
class SmartProxyManager:
def __init__(self, proxies: list, max_retries: int = 3, retry_delay: float = 1.0):
self.proxies = proxies
self.max_retries = max_retries
self.retry_delay = retry_delay
self.failed_proxies = set()
self.proxy_stats = {}
def get_working_proxy(self) -> Optional[Dict[str, str]]:
"""获取可用的代理"""
available_proxies = [p for p in self.proxies if str(p) not in self.failed_proxies]
if not available_proxies:
return None
# 优先选择成功率高的代理
sorted_proxies = sorted(
available_proxies,
key=lambda x: self.proxy_stats.get(str(x), {}).get('success_rate', 0),
reverse=True
)
return sorted_proxies[0]
def mark_proxy_failed(self, proxy: Dict[str, str]):
"""标记代理为失败"""
proxy_str = str(proxy)
self.failed_proxies.add(proxy_str)
# 更新统计信息
if proxy_str not in self.proxy_stats:
self.proxy_stats[proxy_str] = {'success_count': 0, 'fail_count': 0}
self.proxy_stats[proxy_str]['fail_count'] += 1
def mark_proxy_success(self, proxy: Dict[str, str]):
"""标记代理为成功"""
proxy_str = str(proxy)
if proxy_str not in self.proxy_stats:
self.proxy_stats[proxy_str] = {'success_count': 0, 'fail_count': 0}
self.proxy_stats[proxy_str]['success_count'] += 1
def get_proxy_success_rate(self, proxy: Dict[str, str]) -> float:
"""获取代理成功率"""
proxy_str = str(proxy)
stats = self.proxy_stats.get(proxy_str, {'success_count': 0, 'fail_count': 0})
total = stats['success_count'] + stats['fail_count']
if total == 0:
return 0.0
return stats['success_count'] / total
def smart_request(self, url: str, **kwargs) -> Optional[requests.Response]:
"""智能请求,自动处理代理失败和重试"""
for attempt in range(self.max_retries):
proxy = self.get_working_proxy()
if not proxy:
print("没有可用的代理")
return None
try:
print(f"尝试使用代理 {proxy} (第{attempt + 1}次)")
# 添加代理到kwargs
kwargs['proxies'] = proxy
kwargs['timeout'] = kwargs.get('timeout', 10)
response = requests.get(url, **kwargs)
if response.status_code == 200:
self.mark_proxy_success(proxy)
print(f"请求成功!代理成功率: {self.get_proxy_success_rate(proxy):.2%}")
return response
else:
print(f"HTTP状态码异常: {response.status_code}")
self.mark_proxy_failed(proxy)
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
self.mark_proxy_failed(proxy)
# 重试延迟
if attempt < self.max_retries - 1:
delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"等待 {delay:.1f} 秒后重试...")
time.sleep(delay)
return None
# 使用示例
proxies = [
{'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},
{'http': 'http://proxy1.example.com:8080', 'https': 'https://proxy1.example.com:8080'},
{'http': 'http://proxy2.example.com:8080', 'https': 'https://proxy2.example.com:8080'}
]
manager = SmartProxyManager(proxies)
# 智能请求
response = manager.smart_request('https://httpbin.org/ip')
if response:
print(f"最终成功!响应: {response.json()}")
else:
print("所有代理都失败")
07|代理池的构建和管理策略
构建高效的代理池是大型爬虫项目的核心。一个优秀的代理池需要具备自动获取、验证、调度和监控等功能。
完整代理池架构设计
import asyncio
import aiohttp
import time
import random
import json
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProxyPool:
"""高性能代理池管理器"""
def __init__(self, db_path: str = 'proxy_pool.db', max_concurrent_tests: int = 10):
self.db_path = db_path
self.max_concurrent_tests = max_concurrent_tests
self.test_url = 'https://httpbin.org/ip'
self.timeout = 15
self._init_database()
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS proxies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
proxy TEXT UNIQUE NOT NULL,
protocol TEXT NOT NULL,
ip TEXT NOT NULL,
port INTEGER NOT NULL,
is_valid BOOLEAN DEFAULT 1,
response_time REAL,
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
last_tested TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_proxy ON proxies(proxy);
CREATE INDEX IF NOT EXISTS idx_valid ON proxies(is_valid);
CREATE INDEX IF NOT EXISTS idx_last_tested ON proxies(last_tested);
''')
conn.commit()
conn.close()
def add_proxy(self, proxy: str, protocol: str = 'http') -> bool:
"""添加代理到数据库"""
try:
# 解析代理信息
if '@' in proxy:
# 格式: username:password@ip:port
auth_part, addr_part = proxy.split('@')
ip, port = addr_part.split(':')
else:
# 格式: ip:port
ip, port = proxy.split(':')
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO proxies
(proxy, protocol, ip, port, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (proxy, protocol, ip, int(port)))
conn.commit()
conn.close()
logger.info(f"代理 {proxy} 已添加到数据库")
return True
except Exception as e:
logger.error(f"添加代理失败: {e}")
return False
def add_proxies_batch(self, proxies: List[Dict[str, str]]):
"""批量添加代理"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for proxy_info in proxies:
proxy = proxy_info.get('proxy')
protocol = proxy_info.get('protocol', 'http')
if proxy:
try:
if '@' in proxy:
auth_part, addr_part = proxy.split('@')
ip, port = addr_part.split(':')
else:
ip, port = proxy.split(':')
cursor.execute('''
INSERT OR REPLACE INTO proxies
(proxy, protocol, ip, port, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (proxy, protocol, ip, int(port)))
except Exception as e:
logger.error(f"添加代理 {proxy} 失败: {e}")
conn.commit()
conn.close()
logger.info(f"批量添加 {len(proxies)} 个代理完成")
async def test_proxy_async(self, session: aiohttp.ClientSession, proxy_info: Dict) -> Dict:
"""异步测试单个代理"""
proxy = proxy_info['proxy']
protocol = proxy_info['protocol']
result = {
'proxy': proxy,
'is_valid': False,
'response_time': 0,
'error': None
}
try:
start_time = time.time()
proxy_url = f"{protocol}://{proxy}"
async with session.get(
self.test_url,
proxy=proxy_url,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
result['response_time'] = time.time() - start_time
if response.status == 200:
result['is_valid'] = True
data = await response.json()
result['response_data'] = data
else:
result['error'] = f"HTTP状态码: {response.status}"
except asyncio.TimeoutError:
result['error'] = "连接超时"
except aiohttp.ClientError as e:
result['error'] = f"客户端错误: {str(e)}"
except Exception as e:
result['error'] = f"未知错误: {str(e)}"
return result
async def test_all_proxies_async(self):
"""异步测试所有代理"""
# 获取所有需要测试的代理
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT proxy, protocol FROM proxies
WHERE is_valid = 1 OR
(last_tested IS NULL OR last_tested < datetime('now', '-1 hour'))
''')
proxies_to_test = cursor.fetchall()
conn.close()
if not proxies_to_test:
logger.info("没有需要测试的代理")
return
logger.info(f"开始测试 {len(proxies_to_test)} 个代理")
# 创建会话并限制并发数
connector = aiohttp.TCPConnector(limit=self.max_concurrent_tests)
async with aiohttp.ClientSession(connector=connector) as session:
# 分批处理,避免一次性创建过多任务
batch_size = self.max_concurrent_tests
for i in range(0, len(proxies_to_test), batch_size):
batch = proxies_to_test[i:i + batch_size]
# 创建测试任务
tasks = []
for proxy, protocol in batch:
proxy_info = {'proxy': proxy, 'protocol': protocol}
tasks.append(self.test_proxy_async(session, proxy_info))
# 执行测试
results = await asyncio.gather(*tasks, return_exceptions=True)
# 更新数据库
await self._update_proxy_results(results)
logger.info(f"完成第 {i//batch_size + 1} 批测试")
# 短暂延迟,避免对测试服务器造成压力
await asyncio.sleep(1)
logger.info("代理测试完成")
async def _update_proxy_results(self, results: List):
"""更新代理测试结果到数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for result in results:
if isinstance(result, Exception):
logger.error(f"测试任务异常: {result}")
continue
proxy = result['proxy']
is_valid = result['is_valid']
response_time = result.get('response_time', 0)
if is_valid:
cursor.execute('''
UPDATE proxies
SET is_valid = 1, response_time = ?, success_count = success_count + 1,
last_tested = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE proxy = ?
''', (response_time, proxy))
else:
cursor.execute('''
UPDATE proxies
SET is_valid = 0, fail_count = fail_count + 1,
last_tested = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE proxy = ?
''', (proxy,))
logger.info(f"代理 {proxy} 测试结果: {'有效' if is_valid else '无效'}")
conn.commit()
conn.close()
def get_valid_proxies(self, limit: int = 10) -> List[Dict]:
"""获取有效的代理列表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT proxy, protocol, response_time, success_count, fail_count
FROM proxies
WHERE is_valid = 1
ORDER BY response_time ASC, success_count DESC
LIMIT ?
''', (limit,))
proxies = []
for row in cursor.fetchall():
proxy, protocol, response_time, success_count, fail_count = row
# 计算成功率
total = success_count + fail_count
success_rate = success_count / total if total > 0 else 0
proxies.append({
'proxy': proxy,
'protocol': protocol,
'response_time': response_time or 999,
'success_rate': success_rate
})
conn.close()
return proxies
def get_random_proxy(self, weighted: bool = True) -> Optional[Dict[str, str]]:
"""获取随机代理"""
valid_proxies = self.get_valid_proxies(limit=50)
if not valid_proxies:
return None
if weighted:
# 基于成功率加权的随机选择
weights = [p['success_rate'] for p in valid_proxies]
if sum(weights) == 0:
# 如果所有权重都是0,使用均等权重
weights = [1] * len(valid_proxies)
selected = random.choices(valid_proxies, weights=weights, k=1)[0]
else:
selected = random.choice(valid_proxies)
return {
'http': f"{selected['protocol']}://{selected['proxy']}",
'https': f"{selected['protocol']}://{selected['proxy']}"
}
def get_proxy_stats(self) -> Dict:
"""获取代理池统计信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 总体统计
cursor.execute('SELECT COUNT(*) FROM proxies')
total = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM proxies WHERE is_valid = 1')
valid = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM proxies WHERE is_valid = 0')
invalid = cursor.fetchone()[0]
# 平均响应时间
cursor.execute('SELECT AVG(response_time) FROM proxies WHERE is_valid = 1 AND response_time IS NOT NULL')
avg_response_time = cursor.fetchone()[0] or 0
conn.close()
return {
'total': total,
'valid': valid,
'invalid': invalid,
'valid_rate': valid / total if total > 0 else 0,
'avg_response_time': round(avg_response_time, 2)
}
def cleanup_invalid_proxies(self, days: int = 7):
"""清理长期无效的代理"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
DELETE FROM proxies
WHERE is_valid = 0 AND
(last_tested < datetime('now', '-' || ? || ' days') OR last_tested IS NULL)
''', (days,))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
logger.info(f"清理了 {deleted_count} 个长期无效的代理")
return deleted_count
# 使用示例
async def main():
# 创建代理池
proxy_pool = ProxyPool()
# 添加一些测试代理
test_proxies = [
{'proxy': '127.0.0.1:8080', 'protocol': 'http'},
{'proxy': '127.0.0.1:8081', 'protocol': 'http'},
{'proxy': '127.0.0.1:8082', 'protocol': 'http'}
]
proxy_pool.add_proxies_batch(test_proxies)
# 测试所有代理
await proxy_pool.test_all_proxies_async()
# 获取统计信息
stats = proxy_pool.get_proxy_stats()
print(f"代理池统计: {stats}")
# 获取有效代理
valid_proxies = proxy_pool.get_valid_proxies(limit=5)
print(f"有效代理: {valid_proxies}")
# 获取随机代理
random_proxy = proxy_pool.get_random_proxy()
print(f"随机代理: {random_proxy}")
# 运行示例
# asyncio.run(main())
08|实际项目中的最佳实践建议
基于多年的代理使用经验,以下是一些在实际项目中被证明非常有效的最佳实践:
1. 代理获取策略
免费代理源(适合学习和测试):
西刺代理、快代理、89免费代理等网站
GitHub上的开源代理池项目
各大代理服务商提供的免费试用
付费代理推荐(适合生产环境):
阿布云: 稳定性好,适合企业级应用
快代理: 性价比高,支持多种协议
芝麻代理: 国内节点丰富,响应速度快
2. 代理使用策略
# 推荐的代理配置结构
PROXY_CONFIG = {
'rotation_enabled': True, # 启用代理轮换
'retry_on_failure': True, # 失败时重试
'max_retries': 3, # 最大重试次数
'request_timeout': 15, # 请求超时时间
'retry_delay': 1, # 重试延迟(秒)
'success_rate_threshold': 0.8, # 成功率阈值
'response_time_threshold': 5 # 响应时间阈值(秒)
}
3. 错误处理和监控
import logging
from dataclasses import dataclass
from typing import Optional
@dataclass
class ProxyMetrics:
"""代理性能指标"""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
avg_response_time: float = 0.0
blocked_count: int = 0
class ProxyMonitor:
"""代理性能监控器"""
def __init__(self):
self.metrics = ProxyMetrics()
self.logger = logging.getLogger(__name__)
def record_request(self, success: bool, response_time: float, blocked: bool = False):
"""记录请求结果"""
self.metrics.total_requests += 1
if success:
self.metrics.successful_requests += 1
# 更新平均响应时间
total_time = self.metrics.avg_response_time * (self.metrics.total_requests - 1) + response_time
self.metrics.avg_response_time = total_time / self.metrics.total_requests
else:
self.metrics.failed_requests += 1
if blocked:
self.metrics.blocked_count += 1
def get_success_rate(self) -> float:
"""获取成功率"""
if self.metrics.total_requests == 0:
return 0.0
return self.metrics.successful_requests / self.metrics.total_requests
def get_report(self) -> dict:
"""获取监控报告"""
return {
'total_requests': self.metrics.total_requests,
'successful_requests': self.metrics.successful_requests,
'failed_requests': self.metrics.failed_requests,
'success_rate': f"{self.get_success_rate():.2%}",
'avg_response_time': f"{self.metrics.avg_response_time:.2f}s",
'blocked_count': self.metrics.blocked_count,
'block_rate': f"{self.metrics.blocked_count / max(self.metrics.total_requests, 1):.2%}"
}
def should_alert(self, threshold: float = 0.7) -> bool:
"""判断是否需要告警"""
return self.get_success_rate() < threshold
4. 性能优化建议
连接池优化:
# 使用连接池复用连接
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=100, # 连接池大小
pool_maxsize=100, # 最大连接数
max_retries=3 # 重试次数
)
session.mount('http://', adapter)
session.mount('https://', adapter)
并发控制:
# 使用信号量控制并发数
import asyncio
from asyncio import Semaphore
class ConcurrentProxyManager:
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
async def fetch_with_proxy(self, url: str, proxy: str):
async with self.semaphore:
# 实际的请求逻辑
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy=proxy) as response:
return await response.text()
5. 安全注意事项
代理认证信息安全存储:
使用环境变量或配置文件
避免在代码中硬编码敏感信息
定期更换认证信息
防止代理劫持:
使用HTTPS代理加密传输
验证代理服务器的SSL证书
监控异常的网络行为
合规性考虑:
遵守目标网站的robots.txt规则
控制请求频率,避免对目标服务器造成过大压力
尊重网站的反爬虫策略
TRAE IDE综合优势:TRAE IDE不仅提供了强大的代码编辑功能,还集成了网络调试、性能监控、代理测试等一站式开发工具。通过其智能提示和实时代码分析功能,你可以快速识别代理配置中的潜在问题,大幅提升开发效率。
09|总结与展望
本文全面介绍了Python中代理IP的使用方法,从基础概念到高级实践,涵盖了日常开发中的主要场景。掌握这些技能将帮助你:
提高爬虫稳定性:通过智能代理池管理,显著降低被封IP的风险
优化请求性能:选择高质量的代理服务器,提升数据采集效率
增强错误处理能力:构建健壮的异常处理机制,确保程序稳定运行
简化开发流程:利用TRAE IDE等专业工具,快速定位和解决代理相关问题
随着网络环境的不断变化,代理技术也在持续发展。未来的代理服务将更加智能化,具备自动切换、智能路由、质量评估等高级功能。作为开发者,我们需要持续学习和实践,才能在这个快速变化的领域中保持竞争力。
希望本文的内容能够帮助你在实际项目中更好地使用代理IP,如果你有任何问题或建议,欢迎在评论区交流讨论!
(此内容由 AI 辅助生成,仅供参考)