|

楼主 |
发表于 2020-10-6 15:51:22
|
显示全部楼层
本帖最后由 Tinken 于 2020-10-6 22:17 编辑
- 新增 多线程代理ip获取功能:加快免费代理ip获取速度
- 新增 多线程ip检测功能:加快ip可用性的检测速度
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @ClassName proxy_pool
- # @Description TODO 代理ip 地址池
- # @Author lanlo
- # @Date 2020-10-03 22:48
- # @Version 1.0
- import requests
- from bs4 import BeautifulSoup
- import redis
- import time
- import random
- import datetime
- # 多线程模块
- import threading
- # 定时任务模块
- from apscheduler.schedulers.background import BlockingScheduler
- # 线程池
- get_ip_threads = []
- ip_threads = []
- # 快代理 国内高匿
- url_inha = "https://www.kuaidaili.com/free/inha/"
- # 快代理 国内普通
- url_intr = "https://www.kuaidaili.com/free/intr/"
- # 89代理
- url_89 = "https://www.89ip.cn/index_.html"
- # 西拉代理
- url_xl = "http://www.xiladaili.com/gaoni/"
- # 本地redis连接
- pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
- # 存储爬取的代理ip
- redis_conn = redis.Redis(connection_pool=pool, max_connections=10, db=0)
- # 模拟浏览器的请求头
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.62 Safari/537.36'
- }
- # 将ip和端口 写入redis数据库
- def add_ip(ip, port, redis_conn):
- redis_conn.zadd("ip", {ip: port}, nx=False)
- print("redis 添加 --> {}:{} ".format(ip, port))
- def spider_ip(url, redis_conn):
- print("从{}中:爬取免费代理ip".format(url))
- try:
- # 从kuaidaili获得免费代理ip
- if "kuaidaili" in url:
- print("从kuaidaili获取代理ip")
- for i in range(1, 6):
- # 拼接页面后的链接
- url_ = url + str(i)
- print("循环{}:{}".format(i, url_))
- # requests请求,获取html页面
- res = requests.get(url_, headers=headers)
- print(res.status_code)
- # 将html页面转化为BeautifulSoup文件,并设置解析器
- soup = BeautifulSoup(res.text, "html.parser")
- # 经过查看元素,我们需要的ip数据都在一个表格中,我们查找所有行,对每一行的ip和端口进行单独处理
- for child in soup.find_all("tr"):
- # 会有页面其他行搜索进来,使用try避免后续错误
- try:
- # 分析元素得到ip
- ip = child.select("td[data-title='IP']")[0].string
- # 分析元素得到端口号
- port = child.select("td[data-title='PORT']")[0].string
- print("以获取:{}:{}".format(ip, port))
- # 调用redis写入函数,将数据写入redis
- add_ip(str(ip), str(port), redis_conn)
- except:
- print("*** 找到一个不含IP的tr ***")
- print("快代理免费代理 第{}页代理IP 获取完成".format(i))
- # 延迟10秒,避免应请求太快被服务器限制IP
- time.sleep(5)
- # 从89ip获得免费代理ip
- elif "89ip" in url:
- print("从89ip获取代理ip")
- for i in range(1, 6):
- # 拼接页面后的链接
- url_ = "https://www.89ip.cn/index_{}.html".format(i)
- print("循环{}:{}".format(i, url_))
- # requests请求,获取html页面
- res = requests.get(url_, headers=headers)
- print("页面响应:", res.status_code)
- # 将html页面转化为BeautifulSoup文件,并设置解析器
- soup = BeautifulSoup(res.text, "html.parser")
- # 经过查看元素,我们需要的ip数据都在一个表格中,我们查找所有行,对每一行的ip和端口进行单独处理
- for child in soup.find_all("tr"):
- # 会有页面其他行搜索进来,使用try避免后续错误
- try:
- # 分析元素得到ip
- ip = child.select("td")[0].string.replace("\n", '').replace("\t", "")
- # 分析元素得到端口号
- port = child.select("td")[1].string.replace("\n", '').replace("\t", "")
- print("已获取:{}:{}".format(ip, port))
- # 调用redis写入函数,将数据写入redis
- add_ip(str(ip), str(port), redis_conn)
- except:
- print("*** 找到一个不含IP的tr ***")
- print("89ip免费代理 第{}页代理IP 获取完成".format(i))
- # 延迟10秒,避免应请求太快被服务器限制IP
- time.sleep(5)
- # 从西拉获取代理ip
- elif "xiladaili" in url:
- print("从西拉获取代理ip")
- for i in range(1, 2):
- # 拼接页面后的链接
- url_ = url + str(i)
- print("循环{}:{}".format(i, url_))
- # requests请求,获取html页面
- res = requests.get(url_, headers=headers)
- print("页面响应:", res.status_code)
- # 将html页面转化为BeautifulSoup文件,并设置解析器
- soup = BeautifulSoup(res.text, "html.parser")
- # 经过查看元素,我们需要的ip数据都在一个表格中,我们查找所有行,对每一行的ip和端口进行单独处理
- for child in soup.find_all("tr"):
- # 会有页面其他行搜索进来,使用try避免后续错误
- try:
- # 分析元素得到ip
- ip = child.select("td")[0].string.split(":")[0]
- port = child.select("td")[0].string.split(":")[1]
- print("已获取:{}:{}".format(ip, port))
- # 调用redis写入函数,将数据写入redis
- add_ip(str(ip), str(port), redis_conn)
- except:
- print("*** 找到一个不含IP的tr ***")
- print("西拉免费代理 第{}页代理IP 获取完成".format(i))
- # 延迟10秒,避免应请求太快被服务器限制IP
- time.sleep(5)
- else:
- print("该网址{}无法获取代理ip地址,请检查源码".format(url))
- except:
- print("网址{}请求失败".format(url))
- # 获取redis 拥有ip的数量
- def get_ip_num(redis_conn):
- num = redis_conn.zcard("ip")
- print("redis 拥有ip的数量:{}个".format(num))
- return num
- # 获取ip的端口
- def get_port(ip,redis_conn):
- port = redis_conn.zscore("ip", ip)
- port = str(port).replace(".0", "")
- return port
- # 随机 获取一个ip
- def get_random_ip(redis_conn):
- # 获取ip数量
- end_num = get_ip_num(redis_conn)
- if end_num == 0:
- # ip地址池没有ip了,应该去爬取更多的ip
- spider_ip(url_intr, redis_conn)
- return ""
- # 从现有ip数量中,获得一个随机编号
- num = random.randint(0, end_num-1)
- # 获得随机的ip
- random_ip = redis_conn.zrange("ip", num, num)
- # 如果随机ip获得不成功
- if not random_ip:
- return ""
- random_ip = str(random_ip[0]).replace("b", '').replace("'", "")
- port = get_port(random_ip, redis_conn)
- # 返回ip 和 port
- print("获得随机IP:{},{}".format(random_ip, port))
- return random_ip, port
- # 获得代理ip的接口
- def get_proxy_ip():
- # 获取ip数量
- end_num = get_ip_num(redis_conn)
- if end_num == 0:
- return ""
- # 从现有ip数量中,获得一个随机编号
- num = random.randint(0, end_num-1)
- print("获得redis的ip编号为:{}".format(num))
- # 获得随机的ip
- random_ip = redis_conn.zrange("ip", num, num)
- # 如果随机ip获得不成功
- if len(random_ip) == 0:
- return ""
- random_ip = str(random_ip[0]).replace("b", '').replace("'", "")
- port = get_port(random_ip, redis_conn)
- # 返回ip 和 port
- print("获得随机IP:{},{}".format(random_ip, port))
- return {'http': '{}:{}'.format(random_ip, port)}
- # 删除redis数据库里的ip
- def remove_ip(ip,redis_conn):
- redis_conn.zrem("ip", ip)
- print("已删除ip:{}".format(ip))
- # 检测ip是否可用
- def aps_detection_ip(redis_conn):
- # 获得一个随机ip
- res = get_random_ip(redis_conn)
- if not res:
- print("获得随机ip失败")
- return ""
- ip = res[0]
- port = res[1]
- try:
- print("{}:{} 检测中...".format(ip, port))
- requests.get("http://www.vrpip.com", proxies={'http': '{}:{}'.format(ip, port)}, timeout=5)
- print("可用ip:{}:{}".format(ip, port))
- except Exception:
- # ip错误失效就删除
- remove_ip(ip, redis_conn)
- # 以后台的方式运行、15秒检测一个
- sched = BlockingScheduler()
- sched.add_job(aps_detection_ip, 'interval', seconds=6, args=[redis_conn])
- # 多线程 获取ip
- def ip_is_enable(ip, port):
- try:
- print("{}:{} 检测中...".format(ip, port))
- requests.get("http://www.vrpip.com", proxies={'http': '{}:{}'.format(ip, port)}, timeout=5)
- print("可用ip:{}:{}".format(ip, port))
- except Exception:
- # ip错误失效就删除
- remove_ip(ip, redis_conn)
- # 多线程 开启检测ip
- def all_ip_thread():
- end_num = get_ip_num(redis_conn)
- for i in range(0, end_num):
- if i == end_num:
- i = i - 1
- ip = redis_conn.zrange("ip", i, i)
- ip = str(ip[0]).replace("b", '').replace("'", "")
- port = get_port(ip, redis_conn)
- ip_thread = threading.Thread(target=ip_is_enable, args=(ip, port), name="all_ip_thread_".format(i))
- ip_thread.start()
- ip_threads.append(ip_thread)
- def get_all_ip_thread():
- get_ip_thread_inha = threading.Thread(target=spider_ip, args=(url_inha, redis_conn))
- get_ip_thread_intr = threading.Thread(target=spider_ip, args=(url_intr, redis_conn))
- get_ip_thread_89 = threading.Thread(target=spider_ip, args=(url_89, redis_conn))
- get_ip_thread_xl = threading.Thread(target=spider_ip, args=(url_xl, redis_conn))
- get_ip_thread_inha.start()
- get_ip_thread_intr.start()
- get_ip_thread_89.start()
- get_ip_thread_xl.start()
- get_ip_threads.append(get_ip_thread_inha)
- get_ip_threads.append(get_ip_thread_intr)
- get_ip_threads.append(get_ip_thread_89)
- get_ip_threads.append(get_ip_thread_xl)
- if __name__ == '__main__':
- print(datetime.datetime.now())
- # 快代理 高匿分类
- # spider_ip(url_inha, redis_conn)
- # 快代理 普通分类
- # spider_ip(url_intr, redis_conn)
- # 89代理
- # spider_ip(url_89, redis_conn)
- # 西拉代理
- # spider_ip(url_xl, redis_conn)
- # 多线程 获取免费代理ip
- get_all_ip_thread()
- for t in get_ip_threads:
- t.join()
- # 多线程检测ip是否可用
- all_ip_thread()
- for t in ip_threads:
- t.join()
- # 定时任务 检测ip
- # sched.start()
复制代码
|
|