golang gin 监听rabbitmq队列无限消费
连接rabbitmq
package database
import (
"github.com/streadway/amqp"
"log"
"reflect"
"yy-data-processing/common/config"
)
var RabbitConn *amqp.Connection
var RabbitChannel *amqp.Channel
func InitRabbitmq() {
var err error
RabbitConn, err = amqp.Dial(config.Config.RabbitUrl)
if err != nil {
log.Println("连接RabbitMQ失败")
panic(err)
}
RabbitChannel, err = RabbitConn.Channel()
if err != nil {
log.Println("获取RabbitMQ channel失败")
panic(err)
}
}
// 0表示channel未关闭,1表示channel已关闭
func CheckRabbitClosed(ch amqp.Channel) int64 {
d := reflect.ValueOf(ch)
i := d.FieldByName("closed").Int()
return i
}
创建生产者
package service
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
"yy-data-processing/common/config"
"yy-data-processing/common/database"
"yy-data-processing/model"
)
func Producer() {
// 声明队列,没有则创建
// 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列)
declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil)
if err != nil {
log.Printf("声明队列 %v 失败, error: %v", config.Config.HawkSaveQueueName, err)
panic(err)
}
request := model.Request{}
marshal, _ := json.Marshal(request )
// exchange、routing key、mandatory、immediate
err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(marshal),
})
if err != nil {
log.Printf("生产者发送消息失败, error: %v", err)
} else {
log.Println("生产者发送消息成功")
}
}
创建消费者
package service
import (
"encoding/json"
"log"
"os"
"strings"
"sync"
"time"
"yy-data-processing/common/config"
"yy-data-processing/common/database"
"yy-data-processing/model"
)
func Consumer() {
// 声明队列,没有则创建
// 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列)
_, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil)
if err != nil {
log.Printf("声明队列 %v 失败, error: %v", config.Config.QueueName, err)
panic(err)
}
// 队列名称、consumer、auto-ack、是否独享
// deliveries是一个管道,有消息到队列,就会消费,消费者的消息只需要从deliveries这个管道获取
deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil)
if err != nil {
log.Printf("从队列 %v 获取数据失败, error: %v", config.Config.QueueName, err)
} else {
log.Println("从消费队列获取任务成功")
}
// 阻塞住
for {
select {
case message := <-deliveries:
closed := database.CheckRabbitClosed(*database.RabbitChannel)
if closed == 1 { // channel 已关闭,重连一下
database.InitRabbitmq()
} else {
msgData := string(message.Body)
request := model.Request{}
err := json.Unmarshal([]byte(msgData), &request)
if err != nil {
log.Printf("解析rabbitmq数据 %v 失败, error: %v", msgData, err)
} else {
// TODO...
// 处理逻辑
}
}
}
}
}
main方法协程调用
package main
import (
"log"
"yy-data-processing/common/config"
"yy-data-processing/common/database"
"yy-data-processing/router"
"yy-data-processing/service"
)
func main() {
// 初始化路由
routers := router.InitRouters()
// 初始化RabbitMQ
database.InitRabbitmq()
go service.Producer()
go service.Consumer()
port := config.Config.Port
if err := routers.Run(":" + port); err != nil {
log.Printf("启动服务失败: ", err)
}
}