文章目录
  1. 1. shell脚本批量导入
  2. 2. java代码实现

最近接到需求,提前数据预热,需要导入千万级数据到redis,如果还是使用普通的逐条插入的方法,可能需要耗费大量的时间,于是调研了redis pipe管道,批量导入,避免以往的插一条数据,建立一次连接的消耗。记录一下两种实现方法,一种是通过shell脚本批量导入,另一种是代码端的实现。接收的均为csv文件。

shell脚本批量导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/sh

timeStamp=`date -d "$current" +%s`
echo $timeStamp

REDIS_HOST=localhost
REDIS_PORT=6379

KEYNAME=BFF:TIMER:
TEMPFILE=/c/Users/barry.cao/Desktop/Time.csv
OUTFILE=/c/Users/barry.cao/Desktop/test.text

cat $TEMPFILE |awk -F "," '{print "SET BFF:TIMER:" $12 " \"{\\\"favorite\\\":\\\""$6"\\\",\\\"firstGoldTime\\\":\\\""$2"\\\",\\\"firstStore\\\":\\\""$11"\\\",\\\"partner_member_id\\\":\\\""$12"\\\",\\\"since\\\":\\\""$1"\\\",\\\"topLevel\\\":\\\""$3"\\\",\\\"totalCities\\\":"$7",\\\"totalFreeCups\\\":"$5",\\\"totalPoints\\\":"$4",\\\"totalStores\\\":"$8",\\\"totalVolume\\\":\\\""$9"\\\",\\\"withFriendsVisits\\\":"$10"}\""}' > $OUTFILE

unix2dos $OUTFILE

timeStamp2=`date -d "$current2" +%s`
echo $timeStamp2
echo "生成文件耗时: $(($timeStamp2-$timeStamp))"

cat $OUTFILE | redis-cli -p 6379 --pipe

timeStamp3=`date -d "$current3" +%s`
echo $timeStamp3
echo "存入数据库耗时: $(($timeStamp3-$timeStamp2))"

在redis服务器上执行此sh文件即可
脚本记录了每一步的耗时,先将接收到的csv文件处理,通过awk命令每一行根据’,’分割作为入参,根据插入到redis里的数据要求格式,组装redis单条插入set key value指令,生成txt文件,然后通过unix2dos转码,如果无此命令需要先安装,然后通过redis-cli + –pipe插入到数据库

java代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
{
ArrayList<String> csvs = new ArrayList<>();
CsvReader reader = new CsvReader(filePath, ',', Charset.forName("UTF-8"));
// 跳过表头 如果需要表头的话,这句可以忽略
//reader.readHeaders();

while (reader.readRecord()) {
String[] strs = reader.getValues();
String customerId = strs[0];

if (customerId != null && customerId != " " && customerId != "") {
csvs.add(customerId);
}
if (csvs.size() >= 1000000) {
batchCacheCustomerIdInfo(csvs);
}
}

if(csvs.size()>0){
batchCacheCustomerIdInfo(csvs);
}
}

private void batchCacheCustomerIdInfo(List<String> dataList) {
//使用pipeline方式
redisTemplate.executePipelined(new RedisCallback<List<Object>>() {
@Override
public List<Object> doInRedis(RedisConnection connection) throws DataAccessException {
for (String customerId : dataList) {
String key = "CUSTOMER_CENTER_MEMBERID";
byte[] rawKey = redisTemplate.getKeySerializer().serialize(key);
connection.lPush(rawKey,redisTemplate.getValueSerializer().serialize(customerId));
}
return null;
}
});
}

private void batchCacheBearInfo(List<String> dataList, long expire) {
//使用pipeline方式
redisTemplate.executePipelined(new RedisCallback<List<Object>>() {
@Override
public List<Object> doInRedis(RedisConnection connection) throws DataAccessException {
for (String openId : dataList) {
String key = RedisHashKey.BFF_ILLEGAL_OPENID + openId;
byte[] rawKey = redisTemplate.getKeySerializer().serialize(key);
if (expire > 0) {
connection.setEx(rawKey, expire, redisTemplate.getValueSerializer().serialize("YES"));
} else {
connection.setNX(rawKey, redisTemplate.getValueSerializer().serialize("YES"));
}
}
return null;
}
});
}

可以通过接口接收一个csv文件,通过CsvReader 逐行读取文件,封装成数据List,然后通过batchCacheCustomerIdInfo方法,批量导入到redis,本段代码是批量插入到redis一个list中。也可通过batchCacheBearInfo方法,插入key-value类型数据。

文章目录
  1. 1. shell脚本批量导入
  2. 2. java代码实现