Python设置代理IP的常用方法与实践指南

Python设置代理IP的常用方法与实践指南

Python设置代理IP的常用方法与实践指南

在网络爬虫、API调用、数据采集等场景中,代理IP的使用已成为开发者必备技能。本文将深入探讨Python中各种代理设置方法,从基础概念到高级实践,助你轻松应对各种网络请求挑战。

01|代理IP基础概念与应用场景

什么是代理IP?

代理IP(Proxy IP)是介于客户端和目标服务器之间的中间服务器,它接收客户端的请求,然后转发给目标服务器,并将响应返回给客户端。在Python开发中,合理使用代理IP可以有效解决以下问题:

核心应用场景:

反爬虫规避:分散请求来源,降低被封IP风险

地理限制突破:访问特定地区的受限内容

匿名性保护:隐藏真实IP地址,保护隐私

负载均衡:分散请求到多个代理,提高稳定性

测试环境:模拟不同地区的用户访问

代理类型详解

graph TD

A[代理类型] --> B[HTTP代理]

A --> C[HTTPS代理]

A --> D[SOCKS代理]

B --> B1[适用于网页抓取]

B --> B2[速度快]

C --> C1[加密传输]

C --> C2[安全性高]

D --> D1[协议无关]

D --> D2[支持UDP]

02|requests库代理配置详解

requests库是Python中最常用的HTTP库之一,其代理配置简单直观。

基础代理设置

import requests

# 定义代理配置

proxies = {

'http': 'http://127.0.0.1:8080',

'https': 'https://127.0.0.1:8080'

}

# 使用代理发送请求

try:

response = requests.get('https://httpbin.org/ip', proxies=proxies, timeout=10)

print(f"通过代理访问,返回IP: {response.json()}")

except requests.exceptions.RequestException as e:

print(f"请求失败: {e}")

带认证的代理配置

import requests

# 需要用户名密码的代理

proxies_with_auth = {

'http': 'http://username:password@proxy.example.com:8080',

'https': 'https://username:password@proxy.example.com:8080'

}

response = requests.get('https://httpbin.org/ip', proxies=proxies_with_auth)

print(response.text)

高级配置技巧

import requests

from requests.adapters import HTTPAdapter

from urllib3.util.retry import Retry

# 创建会话并配置重试策略

session = requests.Session()

# 配置重试策略

retry_strategy = Retry(

total=3,

backoff_factor=1,

status_forcelist=[429, 500, 502, 503, 504]

)

adapter = HTTPAdapter(max_retries=retry_strategy)

session.mount("http://", adapter)

session.mount("https://", adapter)

# 代理配置

proxies = {

'http': 'http://127.0.0.1:8080',

'https': 'https://127.0.0.1:8080'

}

# 发送请求

response = session.get('https://httpbin.org/ip', proxies=proxies)

print(response.json())

TRAE IDE调试技巧:在TRAE IDE中,你可以使用内置的网络调试工具实时监控代理请求的状态码、响应时间和数据包大小,快速定位代理配置问题。

03|urllib库代理配置深度解析

虽然requests库更受欢迎,但urllib作为Python标准库,在某些场景下仍是首选。

urllib基础代理设置

import urllib.request

import urllib.error

# 创建代理处理器

proxy_handler = urllib.request.ProxyHandler({

'http': 'http://127.0.0.1:8080',

'https': 'https://127.0.0.1:8080'

})

# 创建opener

opener = urllib.request.build_opener(proxy_handler)

# 安装opener

urllib.request.install_opener(opener)

try:

# 使用代理发送请求

response = urllib.request.urlopen('https://httpbin.org/ip')

print(response.read().decode('utf-8'))

except urllib.error.URLError as e:

print(f"请求失败: {e}")

带认证的urllib代理

import urllib.request

import base64

# 创建密码管理器

password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()

# 添加用户名和密码

proxy_url = 'proxy.example.com:8080'

username = 'your_username'

password = 'your_password'

password_mgr.add_password(None, proxy_url, username, password)

# 创建代理认证处理器

proxy_auth_handler = urllib.request.ProxyBasicAuthHandler(password_mgr)

# 创建并安装opener

opener = urllib.request.build_opener(proxy_auth_handler)

urllib.request.install_opener(opener)

# 发送请求

response = urllib.request.urlopen('https://httpbin.org/ip')

print(response.read().decode('utf-8'))

04|aiohttp异步代理配置

在现代Python应用中,异步编程越来越重要。aiohttp提供了强大的异步HTTP功能。

基础异步代理设置

import aiohttp

import asyncio

async def fetch_with_proxy():

# 代理配置

proxy = 'http://127.0.0.1:8080'

# 创建TCP连接器

connector = aiohttp.TCPConnector(ssl=False)

# 创建会话

async with aiohttp.ClientSession(connector=connector) as session:

try:

# 使用代理发送异步请求

async with session.get('https://httpbin.org/ip', proxy=proxy) as response:

data = await response.json()

print(f"通过代理访问,返回IP: {data}")

except aiohttp.ClientError as e:

print(f"请求失败: {e}")

# 运行异步函数

asyncio.run(fetch_with_proxy())

带认证的异步代理

import aiohttp

import asyncio

from aiohttp import BasicAuth

async def fetch_with_auth_proxy():

# 代理认证信息

proxy_auth = BasicAuth('username', 'password')

proxy = 'http://proxy.example.com:8080'

# 创建会话

async with aiohttp.ClientSession() as session:

try:

# 使用带认证的代理发送请求

async with session.get(

'https://httpbin.org/ip',

proxy=proxy,

proxy_auth=proxy_auth

) as response:

data = await response.json()

print(f"认证代理访问成功: {data}")

except aiohttp.ClientError as e:

print(f"请求失败: {e}")

asyncio.run(fetch_with_auth_proxy())

高级异步代理池管理

import aiohttp

import asyncio

import random

from typing import List

class AsyncProxyPool:

def __init__(self, proxies: List[str]):

self.proxies = proxies

self.failed_proxies = set()

def get_random_proxy(self) -> str:

available_proxies = [p for p in self.proxies if p not in self.failed_proxies]

if not available_proxies:

raise Exception("无可用的代理")

return random.choice(available_proxies)

def mark_proxy_failed(self, proxy: str):

self.failed_proxies.add(proxy)

print(f"代理 {proxy} 标记为失败")

async def fetch_with_retry(self, url: str, max_retries: int = 3):

for attempt in range(max_retries):

proxy = self.get_random_proxy()

try:

async with aiohttp.ClientSession() as session:

async with session.get(url, proxy=proxy, timeout=10) as response:

if response.status == 200:

return await response.text()

else:

print(f"代理 {proxy} 返回状态码: {response.status}")

except Exception as e:

print(f"代理 {proxy} 请求失败: {e}")

self.mark_proxy_failed(proxy)

raise Exception("所有代理都失败")

# 使用示例

async def main():

proxies = [

'http://127.0.0.1:8080',

'http://127.0.0.1:8081',

'http://127.0.0.1:8082'

]

proxy_pool = AsyncProxyPool(proxies)

try:

result = await proxy_pool.fetch_with_retry('https://httpbin.org/ip')

print("请求成功:", result[:100])

except Exception as e:

print("最终失败:", e)

asyncio.run(main())

05|Selenium WebDriver代理配置

对于需要模拟浏览器行为的场景,Selenium的代理配置尤为重要。

Chrome浏览器代理设置

from selenium import webdriver

from selenium.webdriver.chrome.options import Options

import time

def setup_chrome_proxy():

# Chrome选项配置

chrome_options = Options()

# 代理设置

proxy = '127.0.0.1:8080'

chrome_options.add_argument(f'--proxy-server={proxy}')

# 其他常用选项

chrome_options.add_argument('--disable-gpu')

chrome_options.add_argument('--no-sandbox')

chrome_options.add_argument('--disable-dev-shm-usage')

# 创建驱动

driver = webdriver.Chrome(options=chrome_options)

try:

# 访问测试页面

driver.get('https://httpbin.org/ip')

time.sleep(3)

# 获取页面内容

page_source = driver.page_source

print(f"页面内容: {page_source[:200]}")

finally:

driver.quit()

setup_chrome_proxy()

带认证的Selenium代理

from selenium import webdriver

from selenium.webdriver.chrome.options import Options

import zipfile

import os

def create_proxy_auth_extension(proxy_host, proxy_port, username, password):

"""创建带认证的代理扩展"""

manifest_json = """

{

"version": "1.0.0",

"manifest_version": 2,

"name": "Chrome Proxy",

"permissions": [

"proxy",

"tabs",

"unlimitedStorage",

"storage",

"",

"webRequest",

"webRequestBlocking"

],

"background": {

"scripts": ["background.js"],

"persistent": true

},

"minimum_chrome_version":"22.0.0"

}

"""

background_js = f"""

var config = {{

mode: "fixed_servers",

rules: {{

singleProxy: {{

scheme: "http",

host: "{proxy_host}",

port: parseInt({proxy_port})

}},

bypassList: ["localhost"]

}}

}};

chrome.proxy.settings.set({{value: config, scope: "regular"}}, function() {{}});

function callbackFn(details) {{

return {{

authCredentials: {{

username: "{username}",

password: "{password}"

}}

}};

}}

chrome.webRequest.onAuthRequired.addListener(

callbackFn,

{{urls: [""]}},

['blocking']

);

"""

# 创建扩展文件

pluginfile = 'proxy_auth_plugin.zip'

with zipfile.ZipFile(pluginfile, 'w') as zp:

zp.writestr("manifest.json", manifest_json)

zp.writestr("background.js", background_js)

return pluginfile

def setup_auth_proxy():

# 代理信息

proxy_host = 'proxy.example.com'

proxy_port = 8080

username = 'your_username'

password = 'your_password'

# 创建认证扩展

pluginfile = create_proxy_auth_extension(proxy_host, proxy_port, username, password)

# Chrome选项

chrome_options = Options()

chrome_options.add_extension(pluginfile)

# 创建驱动

driver = webdriver.Chrome(options=chrome_options)

try:

driver.get('https://httpbin.org/ip')

time.sleep(3)

print(f"页面标题: {driver.title}")

finally:

driver.quit()

# 清理扩展文件

if os.path.exists(pluginfile):

os.remove(pluginfile)

setup_auth_proxy()

TRAE IDE调试优势:使用TRAE IDE的浏览器自动化调试功能,可以实时监控Selenium脚本的执行过程,查看每个步骤的截图和网络请求详情,快速定位代理配置问题。

06|代理IP验证与异常处理

有效的代理验证机制是确保爬虫稳定性的关键。

代理有效性验证

import requests

import asyncio

import aiohttp

from typing import List, Dict

import time

class ProxyValidator:

def __init__(self, test_url: str = 'https://httpbin.org/ip'):

self.test_url = test_url

self.timeout = 10

def validate_proxy(self, proxy: Dict[str, str]) -> Dict:

"""验证单个代理的有效性"""

result = {

'proxy': proxy,

'is_valid': False,

'response_time': 0,

'error': None

}

try:

start_time = time.time()

response = requests.get(

self.test_url,

proxies=proxy,

timeout=self.timeout

)

result['response_time'] = time.time() - start_time

if response.status_code == 200:

result['is_valid'] = True

result['response_data'] = response.json()

else:

result['error'] = f"HTTP状态码: {response.status_code}"

except requests.exceptions.ConnectTimeout:

result['error'] = "连接超时"

except requests.exceptions.ProxyError:

result['error'] = "代理错误"

except requests.exceptions.SSLError:

result['error'] = "SSL证书错误"

except Exception as e:

result['error'] = f"未知错误: {str(e)}"

return result

async def validate_proxy_async(self, proxy: Dict[str, str]) -> Dict:

"""异步验证代理"""

result = {

'proxy': proxy,

'is_valid': False,

'response_time': 0,

'error': None

}

try:

start_time = time.time()

async with aiohttp.ClientSession() as session:

async with session.get(

self.test_url,

proxy=list(proxy.values())[0],

timeout=aiohttp.ClientTimeout(total=self.timeout)

) as response:

result['response_time'] = time.time() - start_time

if response.status == 200:

result['is_valid'] = True

result['response_data'] = await response.json()

else:

result['error'] = f"HTTP状态码: {response.status}"

except asyncio.TimeoutError:

result['error'] = "连接超时"

except Exception as e:

result['error'] = f"验证失败: {str(e)}"

return result

def validate_proxy_list(self, proxies: List[Dict[str, str]]) -> List[Dict]:

"""批量验证代理列表"""

results = []

for proxy in proxies:

result = self.validate_proxy(proxy)

results.append(result)

print(f"代理 {proxy} 验证结果: {'有效' if result['is_valid'] else '无效'}")

return results

async def validate_proxy_list_async(self, proxies: List[Dict[str, str]]) -> List[Dict]:

"""异步批量验证代理"""

tasks = [self.validate_proxy_async(proxy) for proxy in proxies]

results = await asyncio.gather(*tasks)

for result in results:

proxy = result['proxy']

print(f"代理 {proxy} 验证结果: {'有效' if result['is_valid'] else '无效'}")

return results

# 使用示例

validator = ProxyValidator()

# 测试代理列表

test_proxies = [

{'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},

{'http': 'http://invalid.proxy:8080', 'https': 'https://invalid.proxy:8080'}

]

# 同步验证

print("=== 同步验证 ===")

sync_results = validator.validate_proxy_list(test_proxies)

# 异步验证

print("\n=== 异步验证 ===")

asyncio.run(validator.validate_proxy_list_async(test_proxies))

智能异常处理机制

import requests

import time

import random

from typing import Optional, Callable

from functools import wraps

class SmartProxyManager:

def __init__(self, proxies: list, max_retries: int = 3, retry_delay: float = 1.0):

self.proxies = proxies

self.max_retries = max_retries

self.retry_delay = retry_delay

self.failed_proxies = set()

self.proxy_stats = {}

def get_working_proxy(self) -> Optional[Dict[str, str]]:

"""获取可用的代理"""

available_proxies = [p for p in self.proxies if str(p) not in self.failed_proxies]

if not available_proxies:

return None

# 优先选择成功率高的代理

sorted_proxies = sorted(

available_proxies,

key=lambda x: self.proxy_stats.get(str(x), {}).get('success_rate', 0),

reverse=True

)

return sorted_proxies[0]

def mark_proxy_failed(self, proxy: Dict[str, str]):

"""标记代理为失败"""

proxy_str = str(proxy)

self.failed_proxies.add(proxy_str)

# 更新统计信息

if proxy_str not in self.proxy_stats:

self.proxy_stats[proxy_str] = {'success_count': 0, 'fail_count': 0}

self.proxy_stats[proxy_str]['fail_count'] += 1

def mark_proxy_success(self, proxy: Dict[str, str]):

"""标记代理为成功"""

proxy_str = str(proxy)

if proxy_str not in self.proxy_stats:

self.proxy_stats[proxy_str] = {'success_count': 0, 'fail_count': 0}

self.proxy_stats[proxy_str]['success_count'] += 1

def get_proxy_success_rate(self, proxy: Dict[str, str]) -> float:

"""获取代理成功率"""

proxy_str = str(proxy)

stats = self.proxy_stats.get(proxy_str, {'success_count': 0, 'fail_count': 0})

total = stats['success_count'] + stats['fail_count']

if total == 0:

return 0.0

return stats['success_count'] / total

def smart_request(self, url: str, **kwargs) -> Optional[requests.Response]:

"""智能请求,自动处理代理失败和重试"""

for attempt in range(self.max_retries):

proxy = self.get_working_proxy()

if not proxy:

print("没有可用的代理")

return None

try:

print(f"尝试使用代理 {proxy} (第{attempt + 1}次)")

# 添加代理到kwargs

kwargs['proxies'] = proxy

kwargs['timeout'] = kwargs.get('timeout', 10)

response = requests.get(url, **kwargs)

if response.status_code == 200:

self.mark_proxy_success(proxy)

print(f"请求成功!代理成功率: {self.get_proxy_success_rate(proxy):.2%}")

return response

else:

print(f"HTTP状态码异常: {response.status_code}")

self.mark_proxy_failed(proxy)

except requests.exceptions.RequestException as e:

print(f"请求异常: {e}")

self.mark_proxy_failed(proxy)

# 重试延迟

if attempt < self.max_retries - 1:

delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)

print(f"等待 {delay:.1f} 秒后重试...")

time.sleep(delay)

return None

# 使用示例

proxies = [

{'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},

{'http': 'http://proxy1.example.com:8080', 'https': 'https://proxy1.example.com:8080'},

{'http': 'http://proxy2.example.com:8080', 'https': 'https://proxy2.example.com:8080'}

]

manager = SmartProxyManager(proxies)

# 智能请求

response = manager.smart_request('https://httpbin.org/ip')

if response:

print(f"最终成功!响应: {response.json()}")

else:

print("所有代理都失败")

07|代理池的构建和管理策略

构建高效的代理池是大型爬虫项目的核心。一个优秀的代理池需要具备自动获取、验证、调度和监控等功能。

完整代理池架构设计

import asyncio

import aiohttp

import time

import random

import json

import sqlite3

from datetime import datetime, timedelta

from typing import List, Dict, Optional

import logging

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

class ProxyPool:

"""高性能代理池管理器"""

def __init__(self, db_path: str = 'proxy_pool.db', max_concurrent_tests: int = 10):

self.db_path = db_path

self.max_concurrent_tests = max_concurrent_tests

self.test_url = 'https://httpbin.org/ip'

self.timeout = 15

self._init_database()

def _init_database(self):

"""初始化数据库"""

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

cursor.execute('''

CREATE TABLE IF NOT EXISTS proxies (

id INTEGER PRIMARY KEY AUTOINCREMENT,

proxy TEXT UNIQUE NOT NULL,

protocol TEXT NOT NULL,

ip TEXT NOT NULL,

port INTEGER NOT NULL,

is_valid BOOLEAN DEFAULT 1,

response_time REAL,

success_count INTEGER DEFAULT 0,

fail_count INTEGER DEFAULT 0,

last_tested TIMESTAMP,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

)

''')

cursor.execute('''

CREATE INDEX IF NOT EXISTS idx_proxy ON proxies(proxy);

CREATE INDEX IF NOT EXISTS idx_valid ON proxies(is_valid);

CREATE INDEX IF NOT EXISTS idx_last_tested ON proxies(last_tested);

''')

conn.commit()

conn.close()

def add_proxy(self, proxy: str, protocol: str = 'http') -> bool:

"""添加代理到数据库"""

try:

# 解析代理信息

if '@' in proxy:

# 格式: username:password@ip:port

auth_part, addr_part = proxy.split('@')

ip, port = addr_part.split(':')

else:

# 格式: ip:port

ip, port = proxy.split(':')

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

cursor.execute('''

INSERT OR REPLACE INTO proxies

(proxy, protocol, ip, port, updated_at)

VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)

''', (proxy, protocol, ip, int(port)))

conn.commit()

conn.close()

logger.info(f"代理 {proxy} 已添加到数据库")

return True

except Exception as e:

logger.error(f"添加代理失败: {e}")

return False

def add_proxies_batch(self, proxies: List[Dict[str, str]]):

"""批量添加代理"""

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

for proxy_info in proxies:

proxy = proxy_info.get('proxy')

protocol = proxy_info.get('protocol', 'http')

if proxy:

try:

if '@' in proxy:

auth_part, addr_part = proxy.split('@')

ip, port = addr_part.split(':')

else:

ip, port = proxy.split(':')

cursor.execute('''

INSERT OR REPLACE INTO proxies

(proxy, protocol, ip, port, updated_at)

VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)

''', (proxy, protocol, ip, int(port)))

except Exception as e:

logger.error(f"添加代理 {proxy} 失败: {e}")

conn.commit()

conn.close()

logger.info(f"批量添加 {len(proxies)} 个代理完成")

async def test_proxy_async(self, session: aiohttp.ClientSession, proxy_info: Dict) -> Dict:

"""异步测试单个代理"""

proxy = proxy_info['proxy']

protocol = proxy_info['protocol']

result = {

'proxy': proxy,

'is_valid': False,

'response_time': 0,

'error': None

}

try:

start_time = time.time()

proxy_url = f"{protocol}://{proxy}"

async with session.get(

self.test_url,

proxy=proxy_url,

timeout=aiohttp.ClientTimeout(total=self.timeout)

) as response:

result['response_time'] = time.time() - start_time

if response.status == 200:

result['is_valid'] = True

data = await response.json()

result['response_data'] = data

else:

result['error'] = f"HTTP状态码: {response.status}"

except asyncio.TimeoutError:

result['error'] = "连接超时"

except aiohttp.ClientError as e:

result['error'] = f"客户端错误: {str(e)}"

except Exception as e:

result['error'] = f"未知错误: {str(e)}"

return result

async def test_all_proxies_async(self):

"""异步测试所有代理"""

# 获取所有需要测试的代理

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

cursor.execute('''

SELECT proxy, protocol FROM proxies

WHERE is_valid = 1 OR

(last_tested IS NULL OR last_tested < datetime('now', '-1 hour'))

''')

proxies_to_test = cursor.fetchall()

conn.close()

if not proxies_to_test:

logger.info("没有需要测试的代理")

return

logger.info(f"开始测试 {len(proxies_to_test)} 个代理")

# 创建会话并限制并发数

connector = aiohttp.TCPConnector(limit=self.max_concurrent_tests)

async with aiohttp.ClientSession(connector=connector) as session:

# 分批处理,避免一次性创建过多任务

batch_size = self.max_concurrent_tests

for i in range(0, len(proxies_to_test), batch_size):

batch = proxies_to_test[i:i + batch_size]

# 创建测试任务

tasks = []

for proxy, protocol in batch:

proxy_info = {'proxy': proxy, 'protocol': protocol}

tasks.append(self.test_proxy_async(session, proxy_info))

# 执行测试

results = await asyncio.gather(*tasks, return_exceptions=True)

# 更新数据库

await self._update_proxy_results(results)

logger.info(f"完成第 {i//batch_size + 1} 批测试")

# 短暂延迟,避免对测试服务器造成压力

await asyncio.sleep(1)

logger.info("代理测试完成")

async def _update_proxy_results(self, results: List):

"""更新代理测试结果到数据库"""

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

for result in results:

if isinstance(result, Exception):

logger.error(f"测试任务异常: {result}")

continue

proxy = result['proxy']

is_valid = result['is_valid']

response_time = result.get('response_time', 0)

if is_valid:

cursor.execute('''

UPDATE proxies

SET is_valid = 1, response_time = ?, success_count = success_count + 1,

last_tested = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP

WHERE proxy = ?

''', (response_time, proxy))

else:

cursor.execute('''

UPDATE proxies

SET is_valid = 0, fail_count = fail_count + 1,

last_tested = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP

WHERE proxy = ?

''', (proxy,))

logger.info(f"代理 {proxy} 测试结果: {'有效' if is_valid else '无效'}")

conn.commit()

conn.close()

def get_valid_proxies(self, limit: int = 10) -> List[Dict]:

"""获取有效的代理列表"""

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

cursor.execute('''

SELECT proxy, protocol, response_time, success_count, fail_count

FROM proxies

WHERE is_valid = 1

ORDER BY response_time ASC, success_count DESC

LIMIT ?

''', (limit,))

proxies = []

for row in cursor.fetchall():

proxy, protocol, response_time, success_count, fail_count = row

# 计算成功率

total = success_count + fail_count

success_rate = success_count / total if total > 0 else 0

proxies.append({

'proxy': proxy,

'protocol': protocol,

'response_time': response_time or 999,

'success_rate': success_rate

})

conn.close()

return proxies

def get_random_proxy(self, weighted: bool = True) -> Optional[Dict[str, str]]:

"""获取随机代理"""

valid_proxies = self.get_valid_proxies(limit=50)

if not valid_proxies:

return None

if weighted:

# 基于成功率加权的随机选择

weights = [p['success_rate'] for p in valid_proxies]

if sum(weights) == 0:

# 如果所有权重都是0,使用均等权重

weights = [1] * len(valid_proxies)

selected = random.choices(valid_proxies, weights=weights, k=1)[0]

else:

selected = random.choice(valid_proxies)

return {

'http': f"{selected['protocol']}://{selected['proxy']}",

'https': f"{selected['protocol']}://{selected['proxy']}"

}

def get_proxy_stats(self) -> Dict:

"""获取代理池统计信息"""

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

# 总体统计

cursor.execute('SELECT COUNT(*) FROM proxies')

total = cursor.fetchone()[0]

cursor.execute('SELECT COUNT(*) FROM proxies WHERE is_valid = 1')

valid = cursor.fetchone()[0]

cursor.execute('SELECT COUNT(*) FROM proxies WHERE is_valid = 0')

invalid = cursor.fetchone()[0]

# 平均响应时间

cursor.execute('SELECT AVG(response_time) FROM proxies WHERE is_valid = 1 AND response_time IS NOT NULL')

avg_response_time = cursor.fetchone()[0] or 0

conn.close()

return {

'total': total,

'valid': valid,

'invalid': invalid,

'valid_rate': valid / total if total > 0 else 0,

'avg_response_time': round(avg_response_time, 2)

}

def cleanup_invalid_proxies(self, days: int = 7):

"""清理长期无效的代理"""

conn = sqlite3.connect(self.db_path)

cursor = conn.cursor()

cursor.execute('''

DELETE FROM proxies

WHERE is_valid = 0 AND

(last_tested < datetime('now', '-' || ? || ' days') OR last_tested IS NULL)

''', (days,))

deleted_count = cursor.rowcount

conn.commit()

conn.close()

logger.info(f"清理了 {deleted_count} 个长期无效的代理")

return deleted_count

# 使用示例

async def main():

# 创建代理池

proxy_pool = ProxyPool()

# 添加一些测试代理

test_proxies = [

{'proxy': '127.0.0.1:8080', 'protocol': 'http'},

{'proxy': '127.0.0.1:8081', 'protocol': 'http'},

{'proxy': '127.0.0.1:8082', 'protocol': 'http'}

]

proxy_pool.add_proxies_batch(test_proxies)

# 测试所有代理

await proxy_pool.test_all_proxies_async()

# 获取统计信息

stats = proxy_pool.get_proxy_stats()

print(f"代理池统计: {stats}")

# 获取有效代理

valid_proxies = proxy_pool.get_valid_proxies(limit=5)

print(f"有效代理: {valid_proxies}")

# 获取随机代理

random_proxy = proxy_pool.get_random_proxy()

print(f"随机代理: {random_proxy}")

# 运行示例

# asyncio.run(main())

08|实际项目中的最佳实践建议

基于多年的代理使用经验,以下是一些在实际项目中被证明非常有效的最佳实践:

1. 代理获取策略

免费代理源(适合学习和测试):

西刺代理、快代理、89免费代理等网站

GitHub上的开源代理池项目

各大代理服务商提供的免费试用

付费代理推荐(适合生产环境):

阿布云: 稳定性好,适合企业级应用

快代理: 性价比高,支持多种协议

芝麻代理: 国内节点丰富,响应速度快

2. 代理使用策略

# 推荐的代理配置结构

PROXY_CONFIG = {

'rotation_enabled': True, # 启用代理轮换

'retry_on_failure': True, # 失败时重试

'max_retries': 3, # 最大重试次数

'request_timeout': 15, # 请求超时时间

'retry_delay': 1, # 重试延迟(秒)

'success_rate_threshold': 0.8, # 成功率阈值

'response_time_threshold': 5 # 响应时间阈值(秒)

}

3. 错误处理和监控

import logging

from dataclasses import dataclass

from typing import Optional

@dataclass

class ProxyMetrics:

"""代理性能指标"""

total_requests: int = 0

successful_requests: int = 0

failed_requests: int = 0

avg_response_time: float = 0.0

blocked_count: int = 0

class ProxyMonitor:

"""代理性能监控器"""

def __init__(self):

self.metrics = ProxyMetrics()

self.logger = logging.getLogger(__name__)

def record_request(self, success: bool, response_time: float, blocked: bool = False):

"""记录请求结果"""

self.metrics.total_requests += 1

if success:

self.metrics.successful_requests += 1

# 更新平均响应时间

total_time = self.metrics.avg_response_time * (self.metrics.total_requests - 1) + response_time

self.metrics.avg_response_time = total_time / self.metrics.total_requests

else:

self.metrics.failed_requests += 1

if blocked:

self.metrics.blocked_count += 1

def get_success_rate(self) -> float:

"""获取成功率"""

if self.metrics.total_requests == 0:

return 0.0

return self.metrics.successful_requests / self.metrics.total_requests

def get_report(self) -> dict:

"""获取监控报告"""

return {

'total_requests': self.metrics.total_requests,

'successful_requests': self.metrics.successful_requests,

'failed_requests': self.metrics.failed_requests,

'success_rate': f"{self.get_success_rate():.2%}",

'avg_response_time': f"{self.metrics.avg_response_time:.2f}s",

'blocked_count': self.metrics.blocked_count,

'block_rate': f"{self.metrics.blocked_count / max(self.metrics.total_requests, 1):.2%}"

}

def should_alert(self, threshold: float = 0.7) -> bool:

"""判断是否需要告警"""

return self.get_success_rate() < threshold

4. 性能优化建议

连接池优化:

# 使用连接池复用连接

session = requests.Session()

adapter = requests.adapters.HTTPAdapter(

pool_connections=100, # 连接池大小

pool_maxsize=100, # 最大连接数

max_retries=3 # 重试次数

)

session.mount('http://', adapter)

session.mount('https://', adapter)

并发控制:

# 使用信号量控制并发数

import asyncio

from asyncio import Semaphore

class ConcurrentProxyManager:

def __init__(self, max_concurrent: int = 10):

self.semaphore = Semaphore(max_concurrent)

async def fetch_with_proxy(self, url: str, proxy: str):

async with self.semaphore:

# 实际的请求逻辑

async with aiohttp.ClientSession() as session:

async with session.get(url, proxy=proxy) as response:

return await response.text()

5. 安全注意事项

代理认证信息安全存储:

使用环境变量或配置文件

避免在代码中硬编码敏感信息

定期更换认证信息

防止代理劫持:

使用HTTPS代理加密传输

验证代理服务器的SSL证书

监控异常的网络行为

合规性考虑:

遵守目标网站的robots.txt规则

控制请求频率,避免对目标服务器造成过大压力

尊重网站的反爬虫策略

TRAE IDE综合优势:TRAE IDE不仅提供了强大的代码编辑功能,还集成了网络调试、性能监控、代理测试等一站式开发工具。通过其智能提示和实时代码分析功能,你可以快速识别代理配置中的潜在问题,大幅提升开发效率。

09|总结与展望

本文全面介绍了Python中代理IP的使用方法,从基础概念到高级实践,涵盖了日常开发中的主要场景。掌握这些技能将帮助你:

提高爬虫稳定性:通过智能代理池管理,显著降低被封IP的风险

优化请求性能:选择高质量的代理服务器,提升数据采集效率

增强错误处理能力:构建健壮的异常处理机制,确保程序稳定运行

简化开发流程:利用TRAE IDE等专业工具,快速定位和解决代理相关问题

随着网络环境的不断变化,代理技术也在持续发展。未来的代理服务将更加智能化,具备自动切换、智能路由、质量评估等高级功能。作为开发者,我们需要持续学习和实践,才能在这个快速变化的领域中保持竞争力。

希望本文的内容能够帮助你在实际项目中更好地使用代理IP,如果你有任何问题或建议,欢迎在评论区交流讨论!

(此内容由 AI 辅助生成,仅供参考)

相关推荐

乾:乾怎么读,乾字什么意思?
外勤365官方网站

乾:乾怎么读,乾字什么意思?

📅 10-05 👁️ 3374
哈基米将在非洲杯上首次亮相摩洛哥,向球迷发出呼吁
约彩365苹果在线安装

哈基米将在非洲杯上首次亮相摩洛哥,向球迷发出呼吁

📅 01-29 👁️ 2210
‎锄大地(Chu Dai D/Big 2) ZingPlay
外勤365官方网站

‎锄大地(Chu Dai D/Big 2) ZingPlay

📅 11-01 👁️ 1377
怎么提取图片中的文字
外勤365官方网站

怎么提取图片中的文字

📅 08-03 👁️ 6259
人力资源app排名最新推荐,人力资源app哪个好用?
外勤365官方网站

人力资源app排名最新推荐,人力资源app哪个好用?

📅 10-28 👁️ 4084
原神大世界怪物刷新时间是多长 怪物刷新时间解析
约彩365苹果在线安装

原神大世界怪物刷新时间是多长 怪物刷新时间解析

📅 10-21 👁️ 1406