From 753bb27bb6a3d38e687391e4a164b6adea210636 Mon Sep 17 00:00:00 2001 From: gotoeasy Date: Sun, 3 Jul 2022 11:07:30 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- glc/ldb/storage/indexdoc/index_doc_storage.go | 3 +- .../storage/indexword/index_word_storage.go | 5 +- glc/ldb/storage/logdata/logdata_storage.go | 74 ++++++++++++------- 3 files changed, 51 insertions(+), 31 deletions(-) diff --git a/glc/ldb/storage/indexdoc/index_doc_storage.go b/glc/ldb/storage/indexdoc/index_doc_storage.go index 26c75d1..bdfd7dd 100644 --- a/glc/ldb/storage/indexdoc/index_doc_storage.go +++ b/glc/ldb/storage/indexdoc/index_doc_storage.go @@ -47,7 +47,8 @@ func getStorage(cacheName string) *DocIndexStorage { func NewDocIndexStorage(storeName string, word string) *DocIndexStorage { // 存储器,文档,自定义对象 // 缓存有则取用 - subPath := "inverted" + cmn.PathSeparator() + "d_" + cmn.HashAndMod(word, 10) + subPath := "inverted" + cmn.PathSeparator() + "d" + // subPath := "inverted" + cmn.PathSeparator() + "d_" + cmn.HashAndMod(word, 10) cacheName := storeName + cmn.PathSeparator() + subPath cacheStore := getStorage(cacheName) if cacheStore != nil { diff --git a/glc/ldb/storage/indexword/index_word_storage.go b/glc/ldb/storage/indexword/index_word_storage.go index b855196..6bbdcb9 100644 --- a/glc/ldb/storage/indexword/index_word_storage.go +++ b/glc/ldb/storage/indexword/index_word_storage.go @@ -50,7 +50,8 @@ func getStorage(cacheName string) *WordIndexStorage { func NewWordIndexStorage(storeName string, word string) *WordIndexStorage { // 存储器,文档,自定义对象 // 缓存有则取用 - subPath := "inverted" + cmn.PathSeparator() + "k_" + cmn.HashAndMod(word, 10) + subPath := "inverted" + cmn.PathSeparator() + "k" + // subPath := "inverted" + cmn.PathSeparator() + "k_" + cmn.HashAndMod(word, 10) cacheName := storeName + cmn.PathSeparator() + subPath cacheStore := getStorage(cacheName) if cacheStore != nil { @@ -145,7 +146,7 @@ func (s *WordIndexStorage) Add(word string, docId uint32) error { log.Println("保存关键词反向索引件数失败", err) return err // 忽略事务问题,可下回重建 } - // log.Println("创建日志索引:", docId, ",关键词:", s.word) + // log.Println("创建日志索引:", docId, ",关键词:", word) return nil } diff --git a/glc/ldb/storage/logdata/logdata_storage.go b/glc/ldb/storage/logdata/logdata_storage.go index c726b47..34fc276 100644 --- a/glc/ldb/storage/logdata/logdata_storage.go +++ b/glc/ldb/storage/logdata/logdata_storage.go @@ -29,6 +29,8 @@ type LogDataStorage struct { leveldb *leveldb.DB // leveldb currentCount uint32 // 当前件数 savedCurrentCount uint32 // 已保存的当前件数 + indexedCount uint32 // 已创建的索引件数 + savedIndexedCount uint32 // 已保存的索引件数 lastTime int64 // 最后一次访问时间 closing bool // 是否关闭中状态 mu sync.Mutex // 锁 @@ -84,14 +86,14 @@ func NewLogDataStorage(storeName string, subPath string) *LogDataStorage { // panic(err) } store.leveldb = db - store.currentCount = store.loadTotalCount() // 读取总件数 - mapStorage[cacheName] = store // 缓存起来 + store.loadMetaData() // 初始化件数等信息 + mapStorage[cacheName] = store // 缓存起来 // 消费就绪 go store.readyGo() // 定时判断保存总件数,避免每次保存以提高性能 - go store.readySaveCurrentCount() + go store.readySaveMetaDate() // 逐秒判断,若闲置超时则自动关闭 go store.autoCloseWhenMaxIdle() @@ -101,7 +103,7 @@ func NewLogDataStorage(storeName string, subPath string) *LogDataStorage { // } // 等待接收日志,优先响应保存日志,空时再生成索引 -func (s *LogDataStorage) readySaveCurrentCount() { +func (s *LogDataStorage) readySaveMetaDate() { ticker := time.NewTicker(time.Second * 5) for { <-ticker.C @@ -109,19 +111,10 @@ func (s *LogDataStorage) readySaveCurrentCount() { ticker.Stop() break } - s.saveCurrentCount() + s.saveMetaData() } } -func (s *LogDataStorage) saveCurrentCount() { - if s.currentCount == s.savedCurrentCount { - return - } - s.savedCurrentCount = s.currentCount - s.leveldb.Put(zeroUint32Bytes, cmn.Uint32ToBytes(s.savedCurrentCount), nil) // 保存日志总件数 - log.Println("保存LogDataStorage件数:", s.savedCurrentCount) -} - // 等待接收日志,优先响应保存日志,空时再生成索引 func (s *LogDataStorage) readyGo() { for { @@ -174,17 +167,15 @@ func (s *LogDataStorage) saveLogData(model *LogDataModel) { func (s *LogDataStorage) createInvertedIndex() int { // 索引信息和日志数量相互比较,判断是否继续创建索引 - mntKey := "INDEX:" + s.StoreName() - sysStorage := sysmnt.GetSysmntStorage(s.StoreName()) - sysmntData := sysStorage.GetSysmntData(mntKey) - if s.TotalCount() == 0 || sysmntData.Count >= s.TotalCount() { + + if s.TotalCount() == 0 || s.indexedCount >= s.TotalCount() { return 0 // 没有新的日志需要建索引 } - sysmntData.Count++ // 下一条要建索引的日志id - docm, err := s.GetLogDataModel(sysmntData.Count) // 取出日志模型数据 + s.indexedCount++ // 下一条要建索引的日志id + docm, err := s.GetLogDataModel(s.indexedCount) // 取出日志模型数据 if err != nil { - log.Println("取日志模型数据失败:", sysmntData.Count, err) + log.Println("取日志模型数据失败:", s.indexedCount, err) return 2 } @@ -202,9 +193,6 @@ func (s *LogDataStorage) createInvertedIndex() int { } //log.Println("创建日志索引:", cmn.StringToUint32(docm.Id, 0)) - // 保存当前创建了多少索引 - sysStorage.SetSysmntData(mntKey, sysmntData) - return 1 } @@ -284,7 +272,7 @@ func (s *LogDataStorage) Close() { s.closing = true s.wg.Wait() // 等待通道清空 - s.saveCurrentCount() // 判断保存总件数 + s.saveMetaData() // 保存件数等元信息 s.wg.Add(1) // 通道消息计数 s.storeChan <- nil // 通道正在在阻塞等待接收,给个nil让它接收后关闭 s.leveldb.Close() // 走到这里时没有db操作了,可以关闭 @@ -293,12 +281,42 @@ func (s *LogDataStorage) Close() { log.Println("关闭LogDataStorage:", s.storeName+cmn.PathSeparator()+s.subPath) } -func (s *LogDataStorage) loadTotalCount() uint32 { +func (s *LogDataStorage) loadMetaData() { + + // 初始化:当前索引件数 bytes, err := s.leveldb.Get(zeroUint32Bytes, nil) if err != nil || bytes == nil { - return 0 + s.currentCount = 0 + } else { + s.currentCount = cmn.BytesToUint32(bytes) + s.savedCurrentCount = s.currentCount + } + + // 初始化:已建索引件数 + mntKey := "INDEX:" + s.StoreName() + sysStorage := sysmnt.GetSysmntStorage(s.StoreName()) + sysmntData := sysStorage.GetSysmntData(mntKey) + s.indexedCount = sysmntData.Count + s.savedIndexedCount = s.indexedCount +} + +func (s *LogDataStorage) saveMetaData() { + + if s.savedCurrentCount < s.currentCount { + s.savedCurrentCount = s.currentCount + s.leveldb.Put(zeroUint32Bytes, cmn.Uint32ToBytes(s.savedCurrentCount), nil) // 保存日志总件数 + log.Println("保存LogDataStorage件数:", s.savedCurrentCount) + } + + if s.savedIndexedCount < s.indexedCount { + s.savedIndexedCount = s.indexedCount + mntKey := "INDEX:" + s.StoreName() + sysStorage := sysmnt.GetSysmntStorage(s.StoreName()) + sysmntData := new(sysmnt.SysmntData) + sysmntData.Count = s.savedCurrentCount + sysStorage.SetSysmntData(mntKey, sysmntData) + log.Println("保存LogDataStorage已建索引件数:", s.savedIndexedCount) } - return cmn.BytesToUint32(bytes) } // 总件数