背景知识 Pub/Sub Redis中使用SUBSCRIBE , UNSUBSCRIBE 和 PUBLISH 方法将可以实现生产者/消费者模式,所谓的生产者其实就是使用PUBLISH
将message(这里需要说明,message只能是String,不过我们可以序列化一个对象为Json再进行推送)推送到指定的队列中,再由SUBSCRIBE
监听到消息并取出。UNSUBSCRIBE
是取消监听。
具体来说我们可以打开两个redis-cli来进行交互
1 2 3 4 5 6 7 8 9 10 11 12 13 14 127.0 .0 .1 :6379 > SUBSCRIBE first_channel second_channelReading messages... (press Ctrl-C to quit) 1 ) "subscribe" 2 ) "first_channel" 3 ) (integer) 1 1 ) "subscribe" 2 ) "second_channel" 3 ) (integer) 2 1 ) "message" 2 ) "first_channel" 3 ) "first_message" 1 ) "message" 2 ) "second_channel" 3 ) "second_message"
1 2 3 4 5 127.0 .0 .1 :6379 [1 ]> PUBLISH first_channel first_message(integer) 1 127.0 .0 .1 :6379 [1 ]> PUBLISH second_channel second_message(integer) 1 127.0 .0 .1 :6379 [1 ]>
这里需要说明的是SUBSCRIBE
可以同时监听多个队列,这里对我们实现消息队列中的Topic
是至关重要的。
功能实现 这里我们使用Python
实现所有的代码
订阅者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import redisclass SubChannel (object ): def __init__ (self, topic ): self.topic = topic self.redis_conn = redis.Redis(host="localhost" , db=1 ) self.pubsub = self.redis_conn.pubsub() self.pubsub.subscribe("%s:channel" % topic) def run (self ): for i in self.pubsub.listen(): if i['type' ] == 'message' : print "Task get" , i['data' ] if __name__ == "__main__" : SubChannel("Test" ).run()
发布者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import redisclass PubChannel (object ): def __init__ (self, topic ): self.topic = topic self.redis_conn = redis.Redis(host="localhost" , db=1 ) self.pubsub = self.redis_conn.pubsub() def push (self, message ): self.redis_conn.publish("%s:channel" % self.topic, message)
然后我们打开一个Python交互器,我这里使用的是bpython
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 > bpython ~/code/Python/daily/redis-channel@BingLaudeMacBook-Pro.local bpython version 0.14 .2 on top of Python 2.7 .10 /usr/bin /python >>> from pub import PubChannel>>> p = PubChannel("Test" )>>> p.push("test" )>>> > bpython ~/code/Python/daily/redis-channel@BingLaudeMacBook-Pro.local bpython version 0.14 .2 on top of Python 2.7 .10 /usr/bin /python >>> from pub import PubChannel>>> p = PubChannel("Test" )>>> p.push("test" )>>> p = PubChannel("Test1" )>>> p.push("test" )Task get test
这里我们可以设置不同的Topic来实现推送不同类型的消息给不同的订阅者
持久化 由于Redis这种队列不能进行持久化,所以我们需要借助其他的存储来实现持久化,如我们可以使用Mysql
来记录推送的消息,然后在队列消息消耗完成之后将其值置为已完成的状态。
对于Mysql里面的数据,我们可以设置一个消耗时间,如果在某固定时间间隔内消息没有被消耗,我们可以采取相应的操作,如报警,重发等等。
最后 Redis实现消息队列还可以使用List来实现, 这种方法主要是依赖BLPOP
(删除和获取列表中的第一个元素,或阻塞直到有可用) BRPOP
(删除和获取列表中的最后一个元素,或阻塞直到有可用)这两个命令来实现。下面这篇文章很好的讲解了如何用这两个命令来实现消息队列,并附有优先级的实现方法。
用redis实现支持优先级的消息队列