日志仓管理

This commit is contained in:
gotoeasy 2022-07-09 11:04:23 +08:00
parent cb05d49707
commit 075b5177e4
7 changed files with 79 additions and 42 deletions

View File

@ -156,6 +156,7 @@ func GeyStoreNameByDate(name string) string {
}
if conf.IsStoreNameAutoAddDate() {
return fmt.Sprint(name, "-", time.Now().Format("20060102")) // name-yyyymmdd
// return fmt.Sprint(name, "-", time.Now().Format("200601021504")) // name-yyyymmddHHMM
}
return name
}

View File

@ -33,20 +33,20 @@ func init() {
func UpdateConfigByEnv() {
// 读取环境变量初始化配置,各配置都有默认值
storeRoot = Getenv("GLC_STORE_ROOT", "/glogcenter") // 存储根目录
storeChanLength = GetenvInt("GLC_STORE_CHAN_LENGTH", 64) // 存储通道长度
maxIdleTime = GetenvInt("GLC_MAX_IDLE_TIME", 180) // 最大闲置时间(秒),超过闲置时间将自动关闭0时表示不关闭
storeNameAutoAddDate = GetenvBool("GLC_STORE_NAME_AUTO_ADD_DATE", false) // 存储名是否自动添加日期日志量大通常按日单位区分存储默认true
serverPort = GetenvInt("GLC_SERVER_PORT", 8080) // web服务端口
contextPath = Getenv("GLC_CONTEXT_PATH", "/glc") // web服务contextPath
enableSecurityKey = GetenvBool("GLC_ENABLE_SECURITY_KEY", false) // web服务是否开启API秘钥校验默认false
headerSecurityKey = Getenv("GLC_HEADER_SECURITY_KEY", "X-GLC-AUTH") // web服务API秘钥的header键名
securityKey = Getenv("GLC_SECURITY_KEY", "glogcenter") // web服务API秘钥
enableWebGzip = GetenvBool("GLC_ENABLE_WEB_GZIP", true) // web服务是否开启Gzip
enableAmqpConsume = GetenvBool("GLC_ENABLE_AMQP_CONSUME", false) // 是否开启rabbitMq消费者接收日志
amqpAddr = Getenv("GLC_AMQP_ADDR", "") // rabbitMq连接地址"amqp://user:password@ip:port/"
amqpQueueName = Getenv("GLC_AMQP_QUEUE_NAME", "glc-log-queue") // rabbitMq队列名
amqpJsonFormat = GetenvBool("GLC_AMQP_JSON_FORMAT", true) // rabbitMq消息文本是否为json格式默认true
storeRoot = Getenv("GLC_STORE_ROOT", "/glogcenter") // 存储根目录
storeChanLength = GetenvInt("GLC_STORE_CHAN_LENGTH", 64) // 存储通道长度
maxIdleTime = GetenvInt("GLC_MAX_IDLE_TIME", 180) // 最大闲置时间(秒),超过闲置时间将自动关闭0时表示不关闭
storeNameAutoAddDate = GetenvBool("GLC_STORE_NAME_AUTO_ADD_DATE", true) // 存储名是否自动添加日期日志量大通常按日单位区分存储默认true
serverPort = GetenvInt("GLC_SERVER_PORT", 8080) // web服务端口
contextPath = Getenv("GLC_CONTEXT_PATH", "/glc") // web服务contextPath
enableSecurityKey = GetenvBool("GLC_ENABLE_SECURITY_KEY", false) // web服务是否开启API秘钥校验默认false
headerSecurityKey = Getenv("GLC_HEADER_SECURITY_KEY", "X-GLC-AUTH") // web服务API秘钥的header键名
securityKey = Getenv("GLC_SECURITY_KEY", "glogcenter") // web服务API秘钥
enableWebGzip = GetenvBool("GLC_ENABLE_WEB_GZIP", true) // web服务是否开启Gzip
enableAmqpConsume = GetenvBool("GLC_ENABLE_AMQP_CONSUME", false) // 是否开启rabbitMq消费者接收日志
amqpAddr = Getenv("GLC_AMQP_ADDR", "") // rabbitMq连接地址"amqp://user:password@ip:port/"
amqpQueueName = Getenv("GLC_AMQP_QUEUE_NAME", "glc-log-queue") // rabbitMq队列名
amqpJsonFormat = GetenvBool("GLC_AMQP_JSON_FORMAT", true) // rabbitMq消息文本是否为json格式默认true
}
// 取配置: rabbitMq消息文本是否为json格式可通过环境变量“GLC_AMQP_JSON_FORMAT”设定默认值“true”

View File

@ -17,7 +17,7 @@ type Engine struct {
func NewEngine(storeName string) *Engine {
if storeName == "" {
storeName = cmn.GeyStoreNameByDate("logdata")
storeName = cmn.GeyStoreNameByDate("")
}
return &Engine{
@ -28,7 +28,7 @@ func NewEngine(storeName string) *Engine {
}
func NewDefaultEngine() *Engine {
var storeName string = cmn.GeyStoreNameByDate("logdata")
var storeName string = cmn.GeyStoreNameByDate("")
return &Engine{
storeName: storeName,
logStorage: storage.NewLogDataStorageHandle(storeName),

View File

@ -12,6 +12,7 @@ import (
"glc/conf"
"glc/ldb/status"
"glc/ldb/storage/indexword"
"glc/ldb/sysmnt"
"glc/ldb/tokenizer"
"glc/onexit"
"log"
@ -55,8 +56,9 @@ func getCacheStore(cacheName string) *LogDataStorage {
}
// 获取存储对象,线程安全(带缓存无则创建有则直取)
func NewLogDataStorage(storeName string, subPath string) *LogDataStorage { // 存储器,文档,自定义对象
func NewLogDataStorage(storeName string) *LogDataStorage { // 存储器,文档,自定义对象
subPath := "data"
// 缓存有则取用
cacheName := storeName + cmn.PathSeparator() + subPath
cacheStore := getCacheStore(cacheName)
@ -304,13 +306,17 @@ func (s *LogDataStorage) saveMetaData() {
if s.savedCurrentCount < s.currentCount {
s.savedCurrentCount = s.currentCount
s.leveldb.Put(zeroUint32Bytes, cmn.Uint32ToBytes(s.savedCurrentCount), nil) // 保存日志总件数
sysmntStore := sysmnt.NewSysmntStorage() // 系统管理存储器
sysmntStore.SetStorageDataCount(s.storeName, s.savedCurrentCount) // 保存日志总件数
log.Println("保存LogDataStorage件数:", s.savedCurrentCount)
}
if s.savedIndexedCount < s.indexedCount {
s.savedIndexedCount = s.indexedCount
idxw := indexword.NewWordIndexStorage(s.StoreName())
idxw.SaveIndexedCount(s.savedIndexedCount)
idxw.SaveIndexedCount(s.savedIndexedCount) // 保存索引总件数
sysmntStore := sysmnt.NewSysmntStorage() // 系统管理存储器
sysmntStore.SetStorageIndexCount(s.storeName, s.savedCurrentCount) // 保存索引总件数
log.Println("保存LogDataStorage已建索引件数:", s.savedIndexedCount)
}
}

View File

@ -35,7 +35,7 @@ func NewLogDataStorageHandle(storeName string) *LogDataStorageHandle {
}
store := &LogDataStorageHandle{
storage: logdata.NewLogDataStorage(storeName, "data"),
storage: logdata.NewLogDataStorage(storeName),
}
mapStorageHandle[storeName] = store
return store
@ -58,13 +58,13 @@ func (s *LogDataStorageHandle) AddTextLog(date string, logText string, system st
d.System = system
if s.storage.IsClose() {
s.storage = logdata.NewLogDataStorage(s.storage.StoreName(), "data")
s.storage = logdata.NewLogDataStorage(s.storage.StoreName())
}
err := s.storage.Add(d)
if err != nil {
log.Println("竟然失败,再来一次", s.storage.IsClose(), err)
if s.storage.IsClose() {
s.storage = logdata.NewLogDataStorage(s.storage.StoreName(), "data")
s.storage = logdata.NewLogDataStorage(s.storage.StoreName())
}
s.storage.Add(d)
}
@ -79,13 +79,13 @@ func (s *LogDataStorageHandle) AddLogDataModel(data *logdata.LogDataModel) {
}
if s.storage.IsClose() {
s.storage = logdata.NewLogDataStorage(s.storage.StoreName(), "data")
s.storage = logdata.NewLogDataStorage(s.storage.StoreName())
}
err := s.storage.Add(data)
if err != nil {
log.Println("竟然失败,再来一次", s.storage.IsClose(), err)
if s.storage.IsClose() {
s.storage = logdata.NewLogDataStorage(s.storage.StoreName(), "data")
s.storage = logdata.NewLogDataStorage(s.storage.StoreName())
}
s.storage.Add(data)
}

View File

@ -7,8 +7,6 @@ import (
"fmt"
"glc/cmn"
"glc/conf"
"glc/ldb/storage"
"glc/ldb/storage/indexword"
"os"
)
@ -42,10 +40,9 @@ func GetStorageList() *StorageResult {
d.LogCount = 0
d.IndexCount = 0
} else {
store := storage.NewLogDataStorageHandle(name)
d.LogCount = store.TotalCount()
widx := indexword.NewWordIndexStorage(name)
d.IndexCount = widx.GetIndexedCount()
sysmntStore := NewSysmntStorage()
d.LogCount = sysmntStore.GetStorageDataCount(name)
d.IndexCount = sysmntStore.GetStorageIndexCount(name)
}
datas = append(datas, d)
@ -66,5 +63,9 @@ func DeleteStorage(name string) error {
if err != nil {
return err
}
return os.RemoveAll(newPath)
err = os.RemoveAll(newPath)
if err != nil {
return err
}
return NewSysmntStorage().DeleteStorageInfo(name)
}

View File

@ -19,11 +19,10 @@ import (
// 存储结构体
type SysmntStorage struct {
storeName string // 存储目录
subPath string // 存储目录下的相对路径(存放数据)
leveldb *leveldb.DB // leveldb
lastTime int64 // 最后一次访问时间
closing bool // 是否关闭中状态
subPath string // 存储目录下的相对路径(存放数据)
leveldb *leveldb.DB // leveldb
lastTime int64 // 最后一次访问时间
closing bool // 是否关闭中状态
}
var sdbMu sync.Mutex // 锁
@ -34,7 +33,7 @@ func init() {
}
// 获取存储对象,线程安全(带缓存无则创建有则直取)
func GetSysmntStorage(storeName string) *SysmntStorage { // 存储器,文档,自定义对象
func NewSysmntStorage() *SysmntStorage { // 存储器,文档,自定义对象
// 缓存有则取用
subPath := ".sysmnt"
@ -51,7 +50,6 @@ func GetSysmntStorage(storeName string) *SysmntStorage { // 存储器,文档
}
store := new(SysmntStorage)
store.storeName = storeName
store.subPath = subPath
store.closing = false
store.lastTime = time.Now().Unix()
@ -105,6 +103,42 @@ func (s *SysmntStorage) Close() {
log.Println("关闭SysmntStorage", s.subPath)
}
func (s *SysmntStorage) GetStorageDataCount(storeName string) uint32 {
bt, err := s.Get(cmn.StringToBytes("data:" + storeName))
if err != nil {
return 0
}
return cmn.BytesToUint32(bt)
}
func (s *SysmntStorage) SetStorageDataCount(storeName string, count uint32) {
s.Put(cmn.StringToBytes("data:"+storeName), cmn.Uint32ToBytes(count))
}
func (s *SysmntStorage) GetStorageIndexCount(storeName string) uint32 {
bt, err := s.Get(cmn.StringToBytes("index:" + storeName))
if err != nil {
return 0
}
return cmn.BytesToUint32(bt)
}
func (s *SysmntStorage) SetStorageIndexCount(storeName string, count uint32) {
s.Put(cmn.StringToBytes("index:"+storeName), cmn.Uint32ToBytes(count))
}
func (s *SysmntStorage) DeleteStorageInfo(storeName string) error {
err := s.leveldb.Delete(cmn.StringToBytes("data:"+storeName), nil)
if err != nil {
return err
}
err = s.leveldb.Delete(cmn.StringToBytes("index:"+storeName), nil)
if err != nil {
return err
}
return nil
}
// 直接存入数据到leveldb
func (s *SysmntStorage) Put(key []byte, value []byte) error {
if s.closing {
@ -123,11 +157,6 @@ func (s *SysmntStorage) Get(key []byte) ([]byte, error) {
return s.leveldb.Get(key, nil)
}
// 存储目录名
func (s *SysmntStorage) StoreName() string {
return s.storeName
}
// 是否关闭中状态
func (s *SysmntStorage) IsClose() bool {
return s.closing