By
Macintosh-c
总阅读 次
最近接到需求,提前数据预热,需要导入千万级数据到redis,如果还是使用普通的逐条插入的方法,可能需要耗费大量的时间,于是调研了redis pipe管道,批量导入,避免以往的插一条数据,建立一次连接的消耗。记录一下两种实现方法,一种是通过shell脚本批量导入,另一种是代码端的实现。接收的均为csv文件。
shell脚本批量导入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #!/bin/sh
timeStamp=`date -d "$current" +%s` echo $timeStamp
REDIS_HOST=localhost REDIS_PORT=6379
KEYNAME=BFF:TIMER: TEMPFILE=/c/Users/barry.cao/Desktop/Time.csv OUTFILE=/c/Users/barry.cao/Desktop/test.text
cat $TEMPFILE |awk -F "," '{print "SET BFF:TIMER:" $12 " \"{\\\"favorite\\\":\\\""$6"\\\",\\\"firstGoldTime\\\":\\\""$2"\\\",\\\"firstStore\\\":\\\""$11"\\\",\\\"partner_member_id\\\":\\\""$12"\\\",\\\"since\\\":\\\""$1"\\\",\\\"topLevel\\\":\\\""$3"\\\",\\\"totalCities\\\":"$7",\\\"totalFreeCups\\\":"$5",\\\"totalPoints\\\":"$4",\\\"totalStores\\\":"$8",\\\"totalVolume\\\":\\\""$9"\\\",\\\"withFriendsVisits\\\":"$10"}\""}' > $OUTFILE
unix2dos $OUTFILE
timeStamp2=`date -d "$current2" +%s` echo $timeStamp2 echo "生成文件耗时: $(($timeStamp2-$timeStamp))"
cat $OUTFILE | redis-cli -p 6379 --pipe
timeStamp3=`date -d "$current3" +%s` echo $timeStamp3 echo "存入数据库耗时: $(($timeStamp3-$timeStamp2))"
|
在redis服务器上执行此sh文件即可
脚本记录了每一步的耗时,先将接收到的csv文件处理,通过awk命令每一行根据’,’分割作为入参,根据插入到redis里的数据要求格式,组装redis单条插入set key value指令,生成txt文件,然后通过unix2dos转码,如果无此命令需要先安装,然后通过redis-cli + –pipe插入到数据库
java代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| { ArrayList<String> csvs = new ArrayList<>(); CsvReader reader = new CsvReader(filePath, ',', Charset.forName("UTF-8")); // 跳过表头 如果需要表头的话,这句可以忽略 //reader.readHeaders();
while (reader.readRecord()) { String[] strs = reader.getValues(); String customerId = strs[0];
if (customerId != null && customerId != " " && customerId != "") { csvs.add(customerId); } if (csvs.size() >= 1000000) { batchCacheCustomerIdInfo(csvs); } }
if(csvs.size()>0){ batchCacheCustomerIdInfo(csvs); } }
private void batchCacheCustomerIdInfo(List<String> dataList) { //使用pipeline方式 redisTemplate.executePipelined(new RedisCallback<List<Object>>() { @Override public List<Object> doInRedis(RedisConnection connection) throws DataAccessException { for (String customerId : dataList) { String key = "CUSTOMER_CENTER_MEMBERID"; byte[] rawKey = redisTemplate.getKeySerializer().serialize(key); connection.lPush(rawKey,redisTemplate.getValueSerializer().serialize(customerId)); } return null; } }); } private void batchCacheBearInfo(List<String> dataList, long expire) { //使用pipeline方式 redisTemplate.executePipelined(new RedisCallback<List<Object>>() { @Override public List<Object> doInRedis(RedisConnection connection) throws DataAccessException { for (String openId : dataList) { String key = RedisHashKey.BFF_ILLEGAL_OPENID + openId; byte[] rawKey = redisTemplate.getKeySerializer().serialize(key); if (expire > 0) { connection.setEx(rawKey, expire, redisTemplate.getValueSerializer().serialize("YES")); } else { connection.setNX(rawKey, redisTemplate.getValueSerializer().serialize("YES")); } } return null; } }); }
|
可以通过接口接收一个csv文件,通过CsvReader 逐行读取文件,封装成数据List,然后通过batchCacheCustomerIdInfo方法,批量导入到redis,本段代码是批量插入到redis一个list中。也可通过batchCacheBearInfo方法,插入key-value类型数据。
------------------------------------------------------------------------------------------------------------------------------
打赏我的人,运气都不会太差哦~
------------------------------------------------------------------------------------------------------------------------------
打赏我的人,运气都不会太差哦~