【临实战】使用 Python 从 Redis 中删除 4000W 个 KEY

2018-01-13 10:52:52来源:https://juejin.im/post/5a58946a6fb9a01c9c1ef753作者:稀土掘金人点击

分享

本文主要涉及 Redis 的以下两个操作和其 Python 实现,目录:


SCAN 命令


DEL 命令


使用 Python SCAN


使用 Python DEL


成果展示


SCAN 命令

SCAN 命令及相关的 SSCAN、HSCAN 和 ZSCAN 命令都用于增量迭代(incrementally iterate)一个集合的元素(a collection of elements):


SCAN 用于迭代当前数据库中的数据库键


SSCAN 用于迭代集合键中的元素


HSCAN 用于迭代哈希键中的键值对


ZSCAN 用于迭代有序集合中的元素(包括元素分值和元素分值)


以上四列命令都支持增量迭代,每次执行都会返回少量元素,所以他们都可以用于生产环境,而不会出现像 KEYS、SMEMBERS 命令一样 -- 可能会阻塞服务器


不过,增量式迭代命令也不是没有缺点的:


举个例子,使用 SMEMBERS 命令可以返回集合键当前包含的所有元素,但是对于 SCAN 这类增量迭代命令来说,因为在堆键进行增量迭代的过程中,键可能会被改变,所以增量式迭代命令只能对被返回的元素提供有限的保证(offer limited guarantees about the returned elements)。


因为 SCAN、SSCAN、HSCAN 和 ZSCAN 命令的工作方式都非常相似,但是要记住:


SSCAN、HSCAN 和 ZSCAN 命令的第一个参数总是一个数据库键;


SCAN 命令则不需要在第一个参数提供任何数据库键 -- 因为它迭代的是当前数据库中的所有数据库键。


SCAN 命令的基本用法


SCAN 命令是一个基于游标的迭代器(cursor based iterator):


SCAN 命令每次被调用后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数,以此来延续之前的迭代过程。


当 SCAN 命令的游标参数被设置为 0 时,服务器开始一次新的迭代,而当服务器向用户返回值为 0 的游标时,表示迭代结束。


示例:


redis 127.0.0.1:6379> scan 0
1) "17"
2)1) "key:12"
2) "key:8"
3) "key:4"
4) "key:14"
5) "key:16"
6) "key:17"
7) "key:15"
8) "key:10"
9) "key:3"
10) "key:7"
11) "key:1"
redis 127.0.0.1:6379> scan 17
1) "0"
2) 1) "key:5"
2) "key:18"
3) "key:0"
4) "key:2"
5) "key:19"
6) "key:13"
7) "key:6"
8) "key:9"
9) "key:11"

上面的例子中,第一次迭代用 0 作为游标,表示开始第一次迭代。


第二次迭代使用第一次迭代时返回的游标,即:17。


从示例可以看出,SCAN 命令的返回是一个两个元素的数组,第一个元素是新游标,第二个元素也是一个数组,包含有所被包含的元素。


第二次调用 SCAN 命令时,返回游标 0,这表示迭代已经结束了,整个数据集(collection)已经被完整遍历过一遍了。


这个过程被称为一次完整遍历(full iteration)。


精简一下内容,补充三点:


因为 SCAN 命令仅仅使用游标来记录迭代状态,所以在迭代过程中,如果这个数据集的元素有增减,如果是减,不保证元素不返回;如果是增,也不保证一定返回;而且在某种情况下同一个元素还可能被返回多次。所以对迭代返回的元素所执行的操作最好可以重复执行多次(幂等)。


增量迭代命令不保证每次迭代所返回的元素数量(没扫到嘛),但是我们可以使用 COUNT 选项对命令的行为进行一定程度的调整。COUNT 参数的默认值为 10,在迭代一个足够大的、由哈希表实现的数据库、集合键、哈希键或者有序集合键时,如果用户没有使用 MATCH 选项,那么命令返回的数量通常和 COUNT 选项指定的一样,或者多一些(:sweat:),在迭代编码为整数集合(intset:一个由整数值构成的小集合)或编码为压缩列表(ziplist:由不同值构成的一个小哈希或者一个小有序集合)时,会无视 COUNT 选项指定的值,在第一次迭代就将数据集的所有元素都返回给用户。


MATCH 选项,直接看示例吧,如下


示例:


redis 127.0.0.1:6379> sadd myset 1 2 3 foo foobar feelsgood
(integer) 6
redis 127.0.0.1:6379> sscan myset 0 match f*
1) "0"
2) 1) "foo"
2) "feelsgood"
3) "foobar"

注意:对元素的模式匹配工作是在命令从数据集中取出元素之后,向客户端返回元素之前进行的,所以有可能返回空


示例:


redis 127.0.0.1:6379> scan 0 MATCH *11*
1) "288"
2) 1) "key:911"
redis 127.0.0.1:6379> scan 288 MATCH *11*
1) "224"
2) (empty list or set)
redis 127.0.0.1:6379> scan 224 MATCH *11*
1) "80"
2) (empty list or set)
redis 127.0.0.1:6379> scan 80 MATCH *11*
1) "176"
2) (empty list or set)
redis 127.0.0.1:6379> scan 176 MATCH *11* COUNT 1000
1) "0"
2)1) "key:611"
2) "key:711"
3) "key:118"
4) "key:117"
5) "key:311"
6) "key:112"
7) "key:111"
8) "key:110"
9) "key:113"
10) "key:211"
11) "key:411"
12) "key:115"
13) "key:116"
14) "key:114"
15) "key:119"
16) "key:811"
17) "key:511"
18) "key:11"

注意:最后一次迭代,通过 COUNT 选项指定为 1000 强制命令为本次迭代扫描更多元素,从而使返回的元素也变多了。


DEL 命令

这个比较简单,删除给定的一个或者多个 key


redis> SET name "redis"
OK
redis> SET type "key-value store"
OK
redis> SET website "redis.com"
OK
redis> DEL name type website
(integer) 3
使用 Python SCAN

安装 redis 包


pip install redis

完整代码示例:


import redis
pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)
cursor_number, keys = r.execute_command('scan', 0, "count", 200000)
while True:
if cursor_number == 0:
# 结束一次完整的比遍历
break
cursor_number, keys = r.execute_command('scan', cursor_number, "count", 200000)
# do something with keys

我将需要删除的 key 存在一个文件里,有 2.2G,大概 4000W 个,下一步就是删除了


使用 Python DEL

因为文件很大,我们用到一个小技巧,分块读取


with open("/data/rediskeys") as kf:
lines = kf.readlines(1024*1024)

调用 delete 方法时,用到一个小技巧就是『*』星号


r.delete(*taskkey_list)

我们看一下定义就清楚了:



delete method


放上完整代码:


import redis
import time
pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)
start_time = time.time()
SUCCESS_DELETED = 0
with open("/data/rediskeys") as kf:
while True:
lines = kf.readlines(1024*1024)
if not lines:
break
else:
taskkey_list = [i.strip() for i in lines if i.startswith("UCS:TASKKEY")]
SUCCESS_DELETED += r.delete(*taskkey_list)
print SUCCESS_DELETED
end_time = time.time()
print end_time - start_time, SUCCESS_DELETED
成果展示

结束,下篇再见



我的知乎 ·我的知乎专栏 · 我的 GitHub · 我的 Gist


最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台