Scrapy爬取B站用户及优化

简介

因为实习在公司做的都是爬虫的二次开发(单纯开发spider),很想尝试一下自己开发一个之后优化。另外一个,秋招来临,感觉项目有点不对口,算是补充一下。

主要的过程,先把基本的爬虫写出来,写好MySQL存储。之后添加IPProxy代理池,之后加入Scrapy-Redis,将阿里云作为master,再之后就是添加MySQL连接池。

Scrapy基本模块

B站用户网站分析

选择要爬取的信息

随机打开B站的一个up主(可能也不是随机的)

我们可以看到名字,性别,会员等级,关注数,粉丝数,播放数。

另外一些是用户的个人信息,生日,地点,注册时间,uid。
点开了另外几个up主的主页,可以看到uid的长度是有区别的。另外数据库也很有可能随着uid递增,所以测试一下:
uid为1的:

uid为2的:

貌似发现了什么不得了的东西……
另外,可以看到注册时间也是符合猜测的。

因为B站的用户基本信息之前已经有人爬过,所以我想多爬点的来作数据分析:

就是up主做的视频分类及数量。这样可以分析出那个分类的up主比较多。

分析信息入口

打开postman,分析https://space.bilibili.com/546195,可以看到没有啥信息。

打开Chrome的开发者工具F12,选择Network->XHR:

所以只要对https://space.bilibili.com/ajax/member/GetInfoPOST用户的uid就好了。

另外一个是视频的分类:

https://space.bilibili.com/ajax/member/getSubmitVideos,试了一下,其他的参数可以不加,只需要uid。
所以信息的入口就是这样,没有用到xpath解析之类的东西。

编写items.py

信息分析完了之后就可以写items.py,其他挺好写的,就是视频分类这里,因为考虑到B站所有用户里面有做视频的可能还是占少数的,而且每个up主的视频分类也不会太广泛,所以每个分类建一个字段我觉得有些浪费和麻烦。所以最后还是存成dict后转为json在数据库里存储为Blob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# -*- coding: utf-8 -*-
import scrapy

class BilibiliItem(scrapy.Item):
uid = scrapy.Field()
name = scrapy.Field()
space =scrapy.Field()
sex = scrapy.Field()
birthday = scrapy.Field()
address = scrapy.Field()
level = scrapy.Field()
regtime = scrapy.Field()
fans = scrapy.Field()
follows = scrapy.Field()
playnum = scrapy.Field()
videonum = scrapy.Field()
videocate = scrapy.Field()

编写biliup.py(spider)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# -*- coding: utf-8 -*-

import copy
import json
import time
import scrapy
import logging
from scrapy import Request
from scrapy import FormRequest
from bilibili.items import BilibiliItem
import sys
reload(sys)
sys.setdefaultencoding('utf8')

class BiliupSpider(scrapy.Spider):
name = 'biliup'

def start_requests(self):
headers = {'Referer': 'http://space.bilibili.com/1'}
url = 'https://space.bilibili.com/ajax/member/GetInfo'
for mid in range(1, 170000000):
data = {'mid': str(mid)}
yield FormRequest(url, callback=self.parse_up, headers=headers, formdata=data)

def parse_up(self, response):
user = json.loads(response.text)
if not user['status']:
return
user = user['data']
item = BilibiliItem()
item['uid'] = user['mid']
item['name'] = user['name']
item['space'] = 'https://space.bilibili.com/' + item['uid']
item['sex'] = user['sex']
try:
item['birthday'] = user['birthday'][-5:]
except KeyError:
item['birthday'] = ''
try:
item['address'] = user['place']
except KeyError:
item['address'] = ''
item['level'] = user['level_info']['current_level']
try:
t = time.localtime(user['regtime'])
item['regtime'] = time.strftime('%Y-%m-%d',t)
except KeyError:
item['regtime'] = ''
item['fans'] = user['fans']
item['follows'] = user['attention']
item['playnum'] = user['playNum']

url = 'https://space.bilibili.com/ajax/member/getSubmitVideos?mid=' + item['uid']
yield Request(url, callback=self.parse_video, meta={'userdata': item})

def parse_video(self, response):
item = response.meta['userdata']
data = json.loads(response.text)
if not data['status']:
return
data = data['data']
item['videonum'] = data['count']
if data['tlist']:
videocate = {}
for i in data['tlist']:
videocate[data['tlist'][i]['name']] = data['tlist'][i]['count']
item['videocate'] = videocate
else:
item['videocate'] = ''
yield item

从头说起,重写start_requests(),当时测试了一下,有170000000多用户,这会儿也很有可能到两亿了。这个到后面存储得估计得分表,或者换个别的。

之后加个Headerspost用户的uid,回调到parse_up(),json读出数据之后再GEThttps://space.bilibili.com/ajax/member/getSubmitVideos,带上解析的数据meta,回调到parse_video()解析出视频的数据,返回item

编写pipelines.py

存储可以JSON,也有MySQL存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding: utf-8 -*-

import json
import codecs
import logging

class BilibiliJsonPipeline(object):
def __init__(self):
self.file = codecs.open('biliup.json', 'w', encoding='utf-8')
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(line)
return item
def spider_closed(self, spider):
self.file.close()


import MySQLdb

class BilibiliMysqlPipeline(object):
def __init__(self):
self.conn = MySQLdb.connect(host='localhost', port=3306, user='root', passwd='123456', db='biliup')
self.conn.set_character_set('utf8')
self.cursor = self.conn.cursor()
self.cursor.execute('SET NAMES utf8;')
self.cursor.execute('SET CHARACTER SET utf8;')
self.cursor.execute('SET character_set_connection=utf8;')

def process_item(self, item, spider):
uid = item['uid']
self.cursor.execute('select * from biliup where uid=%s' % (uid))
if self.cursor.fetchone():
sql = "update biliup set name='%s', space='%s', sex='%s', birthday='%s', address='%s', level='%s', regtime='%s', fans=%s, follows=%s, playnum=%s, videonum=%s, videocate='{json}' where uid=%s" % (item['name'],item['space'],item['sex'],item['birthday'],item['address'],item['level'],item['regtime'],item['fans'],item['follows'],item['playnum'],item['videonum'],item['uid'])
else:
if item['regtime']:
sql = "insert into biliup values(%s,'%s','%s','%s','%s','%s','%s','%s',%s,%s,%s,%s,'{json}')" % (item['uid'],item['name'],item['space'],item['sex'],item['birthday'],item['address'],item['level'],item['regtime'],item['fans'],item['follows'],item['playnum'],item['videonum'])
else:
sql = "insert into biliup(uid,name,space,sex,birthday,address,level,fans,follows,playnum,videonum,videocate) values(%s,'%s','%s','%s','%s','%s','%s',%s,%s,%s,%s,'{json}')" % (item['uid'],item['name'],item['space'],item['sex'],item['birthday'],item['address'],item['level'],item['fans'],item['follows'],item['playnum'],item['videonum'])
s = json.dumps(item['videocate'])
sql = sql.format(json=MySQLdb.escape_string(s))
self.cursor.execute(sql)
self.conn.commit()
return item

建库建表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE DATABASE biliup character set utf8;

CREATE TABLE biliup(
uid int(11) PRIMARY KEY,
name char(40) NOT NULL,
space char(80) NOT NULL,
sex char(8),
birthday char(5),
address char(20),
level int,
regtime date,
fans int,
follows int,
playnum int,
videonum int,
videocate blob(1024)
)DEFAULT CHARSET=utf8;

其中在JSON字符串存入MySQL的时候,字典的引号会发生错乱。在网上查找资料之后大约是以下步骤:

  1. 数据库字段设为Blob
  2. json.dumps将dict转换成str
  3. MySQLdb.escape_string()转换后用.format填充

Mysql存储的时候判断原表中是否存在已有的记录,如果有update,没有就insert。基本的模块就到这边,之后是改进和优化。

添加代理IP池

代理IP池是拿七夜的IPProxy,之前在公司用过。
在服务器上部署之后,数据库里就有代理IP可以用了。
settings.py里添加中间件设置:

1
2
3
DOWNLOADER_MIDDLEWARES = {
'bilibili.middlewares.ProxyMiddleware': 1,
}

接下来是编写代理中间件middlewares.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# -*- coding: utf-8 -*-
import random
import logging
import json
import requests
from datetime import datetime, timedelta
from twisted.internet.error import TimeoutError, ConnectionRefusedError, ConnectError
from twisted.web._newclient import ResponseNeverReceived
import os

class ProxyMiddleware(object):
# overwrite process request
DONT_RETRY_ERRORS = (TimeoutError, ConnectionRefusedError, ResponseNeverReceived, ConnectError, ValueError)

def __init__(self):
#获取IP池的接口
self.ip_pool_href = "http://127.0.0.1:8000/"
# 保存上次不用代理直接连接的时间点
self.last_no_proxy_time = datetime.now()
# 一定分钟数后切换回不用代理, 因为用代理影响到速度
self.recover_interval = 20
# 一个proxy如果没用到这个数字就被发现老是超时, 则永久移除该proxy. 设为0则不会修改代理文件.
self.dump_count_threshold = 20
# 是否在超时的情况下禁用代理
self.invalid_proxy_flag = True
# 当有效代理小于这个数时(包括直连), 刷新新的代理
self.extend_proxy_threshold = 5
# 初始化代理列表
self.proxyes = [{"proxy": None, "valid": True, "count": 0}]
# 初始时使用0号代理(即无代理)
self.proxy_index = 0
# 表示可信代理的数量(如自己搭建的HTTP代理)+1(不用代理直接连接)
self.fixed_proxy = len(self.proxyes)
# 上一次抓新代理的时间
self.last_fetch_proxy_time = datetime.now()
# 每隔固定时间强制抓取新代理(min)
self.fetch_proxy_interval = 120
# 一个将被设为invalid的代理如果已经成功爬取大于这个参数的页面, 将不会被invalid
self.invalid_proxy_threshold = 200
#是否是https 0代表http,1代表https
self.protocol=0

def url_in_proxyes(self, url):
"""
返回一个代理url是否在代理列表中
"""

for p in self.proxyes:
if url == p["proxy"]:
return True
return False
def reset_proxyes(self):
"""
将所有count>=指定阈值的代理重置为valid,
"""

for p in self.proxyes:
if p["count"] >= self.dump_count_threshold:
p["valid"] = True


def byteify(self,input):
if isinstance(input, dict):
return {self.byteify(key): self.byteify(value) for key, value in input.iteritems()}
elif isinstance(input, list):
return [self.byteify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
else:
return input

def remove_invaild_proxy(self,ip):
ip = ip.split("//")[1]
ip = ip.split(":")[0]
r= requests.get(self.ip_pool_href+"delete?ip="+ip)
logging.info('remove invaild ip '+ip+" ret:"+r.text)



def fetch_new_proxyes(self):
"""
从网上抓取新的代理添加到代理列表中
"""

# new_proxyes = fetch_free_proxyes.fetch_all()
# logger.info("new proxyes: %s" % new_proxyes)
self.last_fetch_proxy_time = datetime.now()

r= requests.get(self.ip_pool_href+ ("?count=5&protocol=%d") % (self.protocol))
new_proxyes = self.byteify(json.loads(r.text))

for np in new_proxyes:
if self.url_in_proxyes("http://" + np[0]+":"+str(np[1])):
continue
else:
self.proxyes.append({"proxy": "http://" + np[0]+":"+str(np[1]),
"valid": True,
"count": 0})

#http 与 https
r_2= requests.get(self.ip_pool_href+ "?count=5&protocol=2")
new_proxyes_2 = self.byteify(json.loads(r_2.text))
for np in new_proxyes_2:
if self.url_in_proxyes("http://" + np[0]+":"+str(np[1])):
continue
else:
self.proxyes.append({"proxy": "http://" + np[0]+":"+str(np[1]),
"valid": True,
"count": 0})

if self.len_valid_proxy() < self.extend_proxy_threshold: # 如果发现抓不到什么新的代理了, 缩小threshold以避免白费功夫
self.extend_proxy_threshold -= 1
def len_valid_proxy(self):
"""
返回proxy列表中有效的代理数量
"""
count = 0
for p in self.proxyes:
if p["valid"]:
count += 1
return count
def inc_proxy_index(self):
"""
将代理列表的索引移到下一个有效代理的位置
如果发现代理列表只有fixed_proxy项有效, 重置代理列表
如果还发现已经距离上次抓代理过了指定时间, 则抓取新的代理
"""

assert self.proxyes[0]["valid"]
while True:
self.proxy_index = (self.proxy_index + 1) % len(self.proxyes)
if self.proxyes[self.proxy_index]["valid"]:
break

# 两轮proxy_index==0的时间间隔过短,扩展代理列表
if self.proxy_index == 0 and datetime.now() < self.last_no_proxy_time + timedelta(minutes=2):
logging.info("captcha thrashing")
self.fetch_new_proxyes()


if self.len_valid_proxy() <= self.fixed_proxy or self.len_valid_proxy() < self.extend_proxy_threshold: # 如果代理列表中有效的代理不足的话重置为valid
self.reset_proxyes()


if self.len_valid_proxy() < self.extend_proxy_threshold: # 代理数量仍然不足, 抓取新的代理
logging.info("valid proxy < threshold: %d/%d" % (self.len_valid_proxy(), self.extend_proxy_threshold))
self.fetch_new_proxyes()

logging.info("now using new proxy: %s" % self.proxyes[self.proxy_index]["proxy"])


def set_proxy(self, request):
"""
将request设置使用为当前的或下一个有效代理
"""
# self.inc_proxy_index()
proxy = self.proxyes[self.proxy_index]

if not proxy["valid"]:

self.inc_proxy_index()
proxy = self.proxyes[self.proxy_index]

if self.proxy_index == 0: # 每次不用代理直接下载时更新self.last_no_proxy_time
self.last_no_proxy_time = datetime.now()

if proxy["proxy"]:
request.meta["proxy"] = proxy["proxy"]
elif "proxy" in request.meta.keys():
del request.meta["proxy"]
request.meta["proxy_index"] = self.proxy_index
proxy["count"] += 1

def invalid_proxy(self, index):
"""
将index指向的proxy设置为invalid,
并调整当前proxy_index到下一个有效代理的位置
"""
if index < self.fixed_proxy: # 可信代理永远不会设为invalid
self.inc_proxy_index()
return

self.remove_invaild_proxy(self.proxyes[index]['proxy'])

if self.proxyes[index]["valid"]:
logging.info("invalidate %s" % self.proxyes[index])
self.proxyes[index]["valid"] = False
if index == self.proxy_index:
self.inc_proxy_index()


def process_request(self, request, spider):
"""
将request设置为使用代理
"""
if request.url.split(':')[0] == 'http':
self.protocol = 0
else:
self.protocol = 1

if self.proxy_index > 0 and datetime.now() > (self.last_no_proxy_time + timedelta(minutes=self.recover_interval)):
logging.info("After %d minutes later, recover from using proxy" % self.recover_interval)
self.last_no_proxy_time = datetime.now()
self.proxy_index = 0
# request.meta["dont_redirect"] = True # 有些代理会把请求重定向到一个莫名其妙的地址

# spider发现parse error, 要求更换代理
if "change_proxy" in request.meta.keys() and request.meta["change_proxy"]:
logging.info("change proxy request get by spider: %s" % request)
self.invalid_proxy(request.meta["proxy_index"])
request.meta["change_proxy"] = False
self.set_proxy(request)


def process_response(self, request, response, spider):
"""
检查response.status, 根据status是否在允许的状态码中决定是否切换到下一个proxy, 或者禁用proxy
"""
if "proxy" in request.meta.keys():
logging.info("%s %s %s" % (request.meta["proxy"], response.status, request.url))
else:
logging.info("None %s %s" % (response.status, request.url))

# status不是正常的200而且不在spider声明的正常爬取过程中可能出现的
# status列表中, 则认为代理无效, 切换代理

if response.status in (402,403,429,502,503):
logging.info("response status not in spider.website_possible_httpstatus_list")
self.invalid_proxy(request.meta["proxy_index"])
new_request = request.copy()
new_request.dont_filter = True
return new_request
else:
return response

def process_exception(self, request, exception, spider):
"""
处理由于使用代理导致的连接异常
"""
logging.error("%s exception: %s" % (self.proxyes[request.meta["proxy_index"]]["proxy"], exception))
# logging.error("%s exception: %s" % ("a", exception))

request_proxy_index = request.meta["proxy_index"]

# 只有当proxy_index>fixed_proxy-1时才进行比较, 这样能保证至少本地直连是存在的.
if isinstance(exception, self.DONT_RETRY_ERRORS):
if request_proxy_index > self.fixed_proxy - 1 and self.invalid_proxy_flag: # WARNING 直连时超时的话换个代理还是重试? 这是策略问题
if self.proxyes[request_proxy_index]["count"] < self.invalid_proxy_threshold:
self.invalid_proxy(request_proxy_index)
elif request_proxy_index == self.proxy_index: # 虽然超时,但是如果之前一直很好用,也不设为invalid
self.inc_proxy_index()
else: # 简单的切换而不禁用
if request.meta["proxy_index"] == self.proxy_index:
self.inc_proxy_index()
new_request = request.copy()
new_request.dont_filter = True
return new_request

中间件关键的是重写process_request()process_response()
process_request()主要是添加request.meta["proxy"];而process_response()主要是判断响应的状态,确定是否需要代理或者更换代理。

之后放到服务器去跑,结果开始时就被判为memory error,估计是一亿多个请求扛不住。想了想还是得用分布式,下面添加scrapy-redis。

scrapy-redis

知乎上的回答

我们把这100台中的99台运算能力较小的机器叫作slave,另外一台较大的机器叫作master,那么回顾上面代码中的url_queue,如果我们能把这个queue放到这台master机器上,所有的slave都可以通过网络跟master联通,每当一个slave完成下载一个网页,就向master请求一个新的网页来抓取。而每次slave新抓到一个网页,就把这个网页上所有的链接送到master的queue里去。同样,bloom filter也放到master上,但是现在master只发送确定没有被访问过的url给slave。

安装

pip install scrapy-redis

使用

安装完之后,可以去scrapy-redis的GitHub下载example示例,里面有三个爬虫,其中两个myspider_redismycrawler_redis分别是以spidercrawlspider为基类扩展的,这里参考的是myspider_redis

RedisSpider为基类,不用写start_urls,写redis_key,作为redis数据库查询url的键。之后默认采用的回调函数是parse()。我原项目首先发送的是FormRequest(),这里查询到源码用的是make_requests_from_url()发送请求。所以这里我修改了,再套一个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import copy
import json
import time
import scrapy
import logging
from scrapy import Request
from scrapy import FormRequest
from bilibili.items import BilibiliItem
from scrapy_redis.spiders import RedisSpider
import sys
reload(sys)
sys.setdefaultencoding('utf8')

__author__ = 'Rexking'

class BiliupSpider(RedisSpider):
name = 'biliup'
redis_key = 'biliup:start_urls'

def parse(self, response):
headers = {'Referer': 'http://space.bilibili.com/1'}
url = 'https://space.bilibili.com/ajax/member/GetInfo'
mid = response.url.split('/')[-1]
data = {'mid': mid}
yield FormRequest(url, callback=self.parse_up, headers=headers, formdata=data)

def parse_up(self, response):
user = json.loads(response.text)
if not user['status']:
return
user = user['data']
item = BilibiliItem()
item['uid'] = user['mid']
item['name'] = user['name']
item['space'] = 'https://space.bilibili.com/' + item['uid']
item['sex'] = user['sex']
try:
item['birthday'] = user['birthday'][-5:]
except KeyError:
item['birthday'] = ''
try:
item['address'] = user['place']
except KeyError:
item['address'] = ''
item['level'] = user['level_info']['current_level']
try:
t = time.localtime(user['regtime'])
item['regtime'] = time.strftime('%Y-%m-%d',t)
except KeyError:
item['regtime'] = ''
item['fans'] = user['fans']
item['follows'] = user['attention']
item['playnum'] = user['playNum']

url = 'https://space.bilibili.com/ajax/member/getSubmitVideos?mid=' + item['uid']
yield Request(url, callback=self.parse_video, meta={'userdata': item})

def parse_video(self, response):
item = response.meta['userdata']
data = json.loads(response.text)
if not data['status']:
return
data = data['data']
item['videonum'] = data['count']
if data['tlist']:
videocate = {}
for i in data['tlist']:
videocate[data['tlist'][i]['name']] = data['tlist'][i]['count']
item['videocate'] = videocate
else:
item['videocate'] = ''
yield item

另外的,需要在settings.py中添加以下设置:

1
2
3
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://root:cRq138@115.28.168.74:6379'

在redis数据库里取url,是靠调度器scheduler来调度的。另外一个对urls的去重,这里采用了bloom filter来判定,具体可以参考:https://llimllib.github.io/bloomfilter-tutorial/。原本的scrapy去重是通过set()的,数量一上去效率就下来了。

另外一个REDIS_URL用来设定远程的redis数据库IP、端口、用户名和密码,如果不设定,则用的是本地的redis数据库。到这里scrapy-redis就添加好了,另外需要添加的是在远程redis数据库打入一亿多个url。目前服务器在跑,不知道会不会出什么差错,再看看。

MySQL连接池

查了一下资料,有MySQL原生的connector,也有SQLAlchemy的session内置连接池。SQLAlchemy使用得学习一下ORM层的用法。这里就用MySQL原生的。

所以pipelines.py修改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# -*- coding: utf-8 -*-

import json
import codecs
import logging


class BilibiliJsonPipeline(object):
def __init__(self):
self.file = codecs.open('biliup.json', 'w', encoding='utf-8')
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(line)
return item
def spider_closed(self, spider):
self.file.close()


import MySQLdb
import mysql.connector as conner
from bilibili.settings import MYSQL_HOST, MYSQL_USER, MYSQL_PASSWD, MYSQL_DBNAME, MYSQL_POOLNAME, MYSQL_POOLSIZE


class BilibiliMysqlPipeline(object):
def __init__(self):
self.conn = conner.connect(host=MYSQL_HOST, port=3306, user=MYSQL_USER, passwd=MYSQL_PASSWD, db=MYSQL_DBNAME, pool_name=MYSQL_POOLNAME, pool_size=MYSQL_POOLSIZE)
self.cursor = self.conn.cursor()
self.cursor.execute('SET character_set_server=utf8;')
self.cursor.execute('SET NAMES utf8;')
self.cursor.execute('SET CHARACTER SET utf8;')
self.cursor.execute('SET character_set_connection=utf8;')

def process_item(self, item, spider):
uid = item['uid']
self.cursor.execute('select * from biliup where uid=%s' % (uid))
if self.cursor.fetchone():
sql = "update biliup set name='%s', space='%s', sex='%s', birthday='%s', address='%s', level='%s', regtime='%s', fans=%s, follows=%s, playnum=%s, videonum=%s, videocate='{json}' where uid=%s" % (item['name'],item['space'],item['sex'],item['birthday'],item['address'],item['level'],item['regtime'],item['fans'],item['follows'],item['playnum'],item['videonum'],item['uid'])
else:
if item['regtime']:
sql = "insert into biliup values(%s,'%s','%s','%s','%s','%s','%s','%s',%s,%s,%s,%s,'{json}')" % (item['uid'],item['name'],item['space'],item['sex'],item['birthday'],item['address'],item['level'],item['regtime'],item['fans'],item['follows'],item['playnum'],item['videonum'])
else:
sql = "insert into biliup(uid,name,space,sex,birthday,address,level,fans,follows,playnum,videonum,videocate) values(%s,'%s','%s','%s','%s','%s','%s',%s,%s,%s,%s,'{json}')" % (item['uid'],item['name'],item['space'],item['sex'],item['birthday'],item['address'],item['level'],item['fans'],item['follows'],item['playnum'],item['videonum'])
s = json.dumps(item['videocate'])
sql = sql.format(json=MySQLdb.escape_string(s))
self.cursor.execute(sql)
self.conn.commit()
return item

一分一毛也是心意