系统城装机大师 - 固镇县祥瑞电脑科技销售部宣传站!

当前位置:首页 > 数据库 > Redis > 详细页面

golang实现redis的延时消息队列功能示例

时间:2019-12-04来源:系统城作者:电脑系统城

前言

在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo。

提前准备 安装redis, redis-go

因为用的是macOS, 直接


 
  1. $ brew install redis
  2. $ go get github.com/garyburd/redigo/redis

又因为比较懒,生成任务的唯一id时,直接采用了bson中的objectId,所以:


 
  1. $ go get gopkg.in/mgo.v2/bson

唯一id不是必须有,但如果之后有实际应用需要携带,便于查找相应任务。

生产者

通过一个for循环生成10w个任务, 每一个任务有不同的时间


 
  1. func producer() {
  2. count := 0
  3. //生成100000个任务
  4. for count < 100000 {
  5. count++
  6. dealTime := int64(rand.Intn(5)) + time.Now().Unix()
  7. uuid := bson.NewObjectId().Hex()
  8. redis.Client.AddJob(&job.JobMessage{
  9. Id: uuid,
  10. DealTime: dealTime,
  11. }, + int64(dealTime))
  12. }
  13. }
  14.  

其中AddJob函数在另一个包中, 将上一个函数中随机生成的时间作为需要处理的时间戳.


 
  1. // 添加任务
  2. func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
  3. conn := client.Get()
  4. defer conn.Close()
  5.  
  6. key := "JOB_MESSAGE_QUEUE"
  7. conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
  8. }
  9.  

消费者

消费者处理流程分为两个步骤:

  • 获取小于等于当前时间戳的任务
  • 通过删除当前任务来判断谁获得了当前任务

因为在获取小于等于当前时间戳的任务时,可能有多个go routine同时读到了当前任务,而只有一个任务可以来处理当前任务。因此我们需要通过一个方案来判断究竟由谁来处理这个任务(当然如果只有一个消费者可以读到就直接处理):这个时候可以通过redis的删除操作来获取,因为删除指定value时只有成功的操作才会返回不为0,所以我们可以认为删除当前队列成功的那个go routine拿到了当前的任务。

下面是代码:


 
  1. // 消费者
  2. func consumer() {
  3. // 启动10个go routine一起去拿
  4. count := 0
  5. for count < 10 {
  6. go func() {
  7. for {
  8. jobs := redis.Client.GetJob()
  9. if len(jobs) <= 0 {
  10. time.Sleep(time.Second * 1)
  11. continue
  12. }
  13. currentJob := jobs[0]
  14. // 如果当前抢redis队列成功,
  15. if redis.Client.DelJob(currentJob) > 0 {
  16. var jobMessage job.JobMessage
  17. util.JsonDecode(currentJob, &jobMessage) //自定义的json解析函数
  18. handleMessage(&jobMessage)
  19. }
  20.  
  21. }
  22.  
  23. }()
  24. count++
  25. }
  26. }
  27.  
  28. // 处理任务用函数
  29. func handleMessage(msg *job.JobMessage) {
  30. fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
  31. go func() {
  32. countChan <- true
  33. }()
  34. }

redis部分的代码,获取任务和删除任务


 
  1. // 获取任务
  2. func (client *RedisClient) GetJob() []string {
  3. conn := client.Get()
  4. defer conn.Close()
  5.  
  6. key := "JOB_MESSAGE_QUEUE"
  7. timeNow := time.Now().Unix()
  8. ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
  9. if err != nil {
  10. panic(err)
  11. }
  12. return ret
  13. }
  14.  
  15. // 删除当前任务, 用来判断是否抢到了当前任务
  16. func (client *RedisClient) DelJob(value string) int {
  17. conn := client.Get()
  18. defer conn.Close()
  19.  
  20. key := "JOB_MESSAGE_QUEUE"
  21. ret, err := redis.Int(conn.Do("zrem", key, value))
  22. if err != nil {
  23. panic(err)
  24. }
  25. return ret
  26. }
  27.  

代码大抵如此。最后跑起来之后,大概每3-4秒钟能够处理掉1w个任务,速度上确实是...

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

分享到:

相关信息

  • redis实现session共享的方法

    引言大厂很多项目都是部署到多台服务器上,这些服务器在各个地区都存在,当我们访问服务时虽然执行的是同一个服务,但是可能是不同服务器运行的;在我学习项目时遇到这样一个登录情...

    2023-11-01

  • 简单聊一聊redis过期时间的问题

    1.多次修改一个redis的String过期键,如何保证他仍然能保留第一次设置时的删除时间 2.修改hash、set、Zset、list的值,会使过期时间重置吗?...

    2023-11-01

系统教程栏目

栏目热门教程

人气教程排行

站长推荐

热门系统下载