重构、添加rabbitmq消费者接收日志功能

This commit is contained in:
gotoeasy 2022-07-05 11:13:21 +08:00
parent 9ea38f5153
commit 6caaf62422
19 changed files with 285 additions and 14 deletions

View File

@ -5,7 +5,7 @@ import (
"encoding/binary"
"encoding/gob"
"fmt"
"glc/ldb/conf"
"glc/conf"
"hash/crc32"
"os"
"strconv"

View File

@ -21,7 +21,10 @@ var contextPath string
var enableSecurityKey bool
var securityKey string
var headerSecurityKey string
var enableAmqpConsume bool
var enableWebGzip bool
var amqpAddr string
var amqpQueueName string
func init() {
UpdateConfigByEnv()
@ -39,6 +42,24 @@ func UpdateConfigByEnv() {
headerSecurityKey = Getenv("GLC_HEADER_SECURITY_KEY", "X-GLC-AUTH") // web服务API秘钥的header键名
securityKey = Getenv("GLC_SECURITY_KEY", "glogcenter") // web服务API秘钥
enableWebGzip = GetenvBool("GLC_ENABLE_WEB_GZIP", true) // web服务是否开启Gzip
enableAmqpConsume = GetenvBool("GLC_ENABLE_AMQP_CONSUME", false) // 是否开启rabbitMq消费者接收日志
amqpAddr = Getenv("GLC_AMQP_ADDR", "") // rabbitMq连接地址"amqp://user:password@ip:port/"
amqpQueueName = Getenv("GLC_AMQP_QUEUE_NAME", "glc-log-queue") // rabbitMq队列名
}
// 取配置: rabbitMq连接地址可通过环境变量“GLC_AMQP_ADDR”设定默认值“”
func GetAmqpQueueName() string {
return amqpQueueName
}
// 取配置: rabbitMq连接地址可通过环境变量“GLC_AMQP_ADDR”设定默认值“”
func GetAmqpAddr() string {
return amqpAddr
}
// 取配置: 是否开启rabbitMq消费者接收日志可通过环境变量“GLC_ENABLE_AMQP_CONSUME”设定默认值“false”
func IsEnableAmqpConsume() bool {
return enableAmqpConsume
}
// 取配置: web服务API秘钥的header键名可通过环境变量“GLC_HEADER_SECURITY_KEY”设定默认值“X-GLC-AUTH”

View File

@ -29,5 +29,6 @@ require (
github.com/gin-contrib/gzip v0.0.5
github.com/gin-gonic/gin v1.8.1
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/streadway/amqp v1.0.0
github.com/wangbin/jiebago v0.3.2
)

View File

@ -58,6 +58,8 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsK
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=

View File

@ -4,7 +4,7 @@ import (
"context"
"fmt"
"glc/cmn"
"glc/ldb/conf"
"glc/conf"
"glc/onexit"
"log"
"net/http"

View File

@ -7,7 +7,7 @@ package indexdoc
import (
"glc/cmn"
"glc/ldb/conf"
"glc/conf"
"glc/onexit"
"log"
"sync"

View File

@ -7,7 +7,7 @@ package indexword
import (
"glc/cmn"
"glc/ldb/conf"
"glc/conf"
"glc/ldb/storage/indexdoc"
"glc/onexit"
"log"

View File

@ -9,7 +9,7 @@ package logdata
import (
"errors"
"glc/cmn"
"glc/ldb/conf"
"glc/conf"
"glc/ldb/storage/indexword"
"glc/ldb/sysmnt"
"glc/ldb/tokenizer"

View File

@ -8,7 +8,7 @@ package sysmnt
import (
"errors"
"glc/cmn"
"glc/ldb/conf"
"glc/conf"
"glc/onexit"
"log"
"sync"

View File

@ -4,7 +4,7 @@
package tokenizer
import (
"glc/ldb/conf"
"glc/conf"
"os"
"strings"

View File

@ -1,11 +1,12 @@
package onstart
import (
"glc/conf"
"glc/gweb"
"glc/gweb/http"
"glc/gweb/method"
"glc/ldb"
"glc/ldb/conf"
"glc/rabbitmq"
"glc/www/controller"
"glc/www/filter"
"glc/www/html"
@ -41,6 +42,8 @@ func Run() {
// 默认引擎空转一下,触发未建索引继续建
go ldb.NewDefaultEngine().AddTextLog("", "", "")
rabbitmq.Start()
})
}

View File

@ -0,0 +1,69 @@
package consume
import (
"glc/ldb"
"glc/ldb/storage/logdata"
"glc/onexit"
"log"
"sync"
)
type RabbitMQConsume struct {
rabbitMQ *RabbitMQ
mu sync.Mutex // 锁
}
var rabbitMQConsume *RabbitMQConsume
func init() {
rabbitMQConsume = new(RabbitMQConsume)
onexit.RegisterExitHandle(onExit)
}
func StartRabbitMQConsume() error {
if rabbitMQConsume.rabbitMQ != nil && !rabbitMQConsume.rabbitMQ.closing {
return nil
}
rabbitMQConsume.mu.Lock()
defer rabbitMQConsume.mu.Unlock()
mq, err := NewSimpleRabbitMQ()
if err != nil {
return err
} else {
rabbitMQConsume.rabbitMQ = mq
rabbitMQConsume.rabbitMQ.StartConsume(fnAmqpJsonLogHandle)
}
return nil
}
func StopRabbitMQConsume() {
if rabbitMQConsume.rabbitMQ != nil {
rabbitMQConsume.rabbitMQ.Close()
}
}
func fnAmqpJsonLogHandle(jsonLog string, err error) bool {
if err != nil {
log.Println(err)
return false
}
log.Println("接收到rabbitmq的日志", jsonLog)
md := &logdata.LogDataModel{}
md.LoadJson(jsonLog)
engine := ldb.NewDefaultEngine()
engine.AddTextLog(md.Date, md.Text, md.System)
return true
}
func onExit() {
rabbitMQConsume.mu.Lock()
defer rabbitMQConsume.mu.Unlock()
rabbitMQConsume.rabbitMQ.Close()
}

View File

@ -0,0 +1,121 @@
package consume
import (
"fmt"
"glc/conf"
"github.com/streadway/amqp"
)
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string
Path string
closing bool
}
// 实例化(简单模式)
func NewSimpleRabbitMQ() (*RabbitMQ, error) {
rabbitmq := &RabbitMQ{
QueueName: conf.GetAmqpQueueName(),
Path: conf.GetAmqpAddr(),
}
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Path)
if err != nil {
return nil, err
}
rabbitmq.channel, err = rabbitmq.conn.Channel()
if err != nil {
return nil, err
}
return rabbitmq, nil
}
// 关闭连接
func (r *RabbitMQ) Close() {
if r.closing {
return
}
r.closing = true
if r.channel != nil {
r.channel.Close()
}
if r.channel != nil {
r.conn.Close()
}
}
// 简单模式生产者
func (r *RabbitMQ) SimplePublish(message string) {
_, err := r.channel.QueueDeclare(
r.QueueName,
false, // durable 是否持久化数据
false, // autoDelete 是否自动删除
false, // exclusive 排他性,权限私有
false, // noWaite 是否阻塞
nil,
)
if err != nil {
fmt.Println(err)
}
r.channel.Publish(
"", // 交换机
r.QueueName, // 队列名
false, // mandatory 如果是true会根据exchange和routekey规则如果无法找到符合条件的队列会把消息的消息返回给发送者
false, // immediate true 表示当exchange将消息发送到队列后发现队列没有绑定消费者则会把消息发回给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
// 简单模式消费者
func (r *RabbitMQ) StartConsume(fnJsonLogHandle func(string, error) bool) {
_, err := r.channel.QueueDeclare(
r.QueueName, // 队列名
false, // durable 是否持久化数据
false, // autoDelete 是否自动删除
false, // exclusive 排他性,权限私有
false, // noWaite 是否阻塞
nil, // arguments
)
if err != nil {
fnJsonLogHandle("", err)
return
}
mqDelivery, err := r.channel.Consume(
r.QueueName, // 队列名
"", // 区分多个消费者
true, // autoAck 是否自动应答
false, // exclusive 是否排它性
false, // noLocal true表示不能将同一个connection中发送的消息传递给这个connection中的消费者
false, // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
nil, // 其他参数
)
if err != nil {
fnJsonLogHandle("", err)
return
}
// channelWait := make(chan bool)
go func() {
for msg := range mqDelivery {
// TODO 是否要考虑失败?
fnJsonLogHandle(string(msg.Body), nil) // 处理接收到的日志
if r.closing {
break
}
}
}()
// <-channelWait
}

View File

@ -0,0 +1,31 @@
package rabbitmq
import (
"glc/conf"
"glc/rabbitmq/consume"
"log"
"time"
)
func Start() {
go func() {
if conf.IsEnableAmqpConsume() {
err := consume.StartRabbitMQConsume()
if err != nil {
log.Println("RabbitMQ连接失败10秒后再试", err)
timer := time.NewTimer(time.Second * 10)
<-timer.C
Start() // 10秒后再试
} else {
log.Println("启动RabbitMQ日志消费")
}
}
}()
}
func Stop() {
if conf.IsEnableAmqpConsume() {
consume.StopRabbitMQConsume()
log.Println("停止RabbitMQ日志消费")
}
}

View File

@ -0,0 +1,23 @@
package rabbitmq
import (
"glc/rabbitmq/consume"
"log"
"testing"
"time"
)
func Test_all(t *testing.T) {
r, err := consume.NewSimpleRabbitMQ()
if err != nil {
log.Println(err)
}
for i := 0; i < 1456; i++ {
r.SimplePublish("{\"text\":\"aaa bbb ccc ddd eee\",\"system\":\"rabbitmq\"}")
}
// Start()
time.Sleep(time.Duration(10) * time.Second)
}

View File

@ -1,8 +1,8 @@
package controller
import (
"glc/conf"
"glc/gweb"
"glc/ldb/conf"
)
// 前端检索页面

View File

@ -1,9 +1,9 @@
package controller
import (
"glc/conf"
"glc/gweb"
"glc/ldb"
"glc/ldb/conf"
"glc/ldb/storage/logdata"
"log"
)

View File

@ -2,8 +2,8 @@ package html
import (
"glc/cmn"
"glc/conf"
"glc/gweb"
"glc/ldb/conf"
"glc/www"
"log"
"os"

View File

@ -118,8 +118,8 @@ export default {
let res = rs.data
if (res.success) {
if (res.result.data) {
this.data.push(...res.result.data)
if (res.result.data.length < params.pageSize) {
this.data.push(...res.result.data)
if (res.result.data.length < this.params.pageSize) {
this.info = `日志总量 ${res.result.total} 条,当前条件最多匹配 ${this.data.length} 条,正展示前 ${this.data.length}`
}else{
this.info = `日志总量 ${res.result.total} 条,当前条件最多匹配 ${res.result.count} 条,正展示前 ${this.data.length}`
@ -141,7 +141,7 @@ export default {
// console.info(res,"xxxxxxxxxxxxxxxxxxxxxxx")
this.data = res.result.data || [];
document.querySelector('.el-scrollbar__wrap').scrollTop = 0; //
if (res.result.data.length < params.pageSize) {
if (this.data.length < this.params.pageSize) {
this.info = `日志总量 ${res.result.total} 条,当前条件最多匹配 ${this.data.length} 条,正展示前 ${this.data.length}`
}else{
this.info = `日志总量 ${res.result.total} 条,当前条件最多匹配 ${res.result.count} 条,正展示前 ${this.data.length}`