diff options
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_in_memory.go | 2 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk.go | 4 | ||||
| -rw-r--r-- | weed/util/config.go | 65 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 24 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 8 |
6 files changed, 80 insertions, 25 deletions
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go index 931e45e9a..1eb00e1fa 100644 --- a/weed/util/chunk_cache/chunk_cache_in_memory.go +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -3,7 +3,7 @@ package chunk_cache import ( "time" - "github.com/karlseguin/ccache" + "github.com/karlseguin/ccache/v2" ) // a global cache for recently accessed file chunks diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index 356dfe188..d724e925e 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -107,9 +107,9 @@ func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { return nil, storage.ErrorNotFound } data := make([]byte, nv.Size) - if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()); readErr != nil { return nil, fmt.Errorf("read %s.dat [%d,%d): %v", - v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr) } else { if readSize != int(nv.Size) { return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) diff --git a/weed/util/config.go b/weed/util/config.go index 94e621e34..ee805f26a 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -2,6 +2,7 @@ package util import ( "strings" + "sync" "github.com/spf13/viper" @@ -28,11 +29,11 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file - logLevel := glog.Level(0) if strings.Contains(err.Error(), "Not Found") { - logLevel = 1 + glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) + } else { + glog.Fatalf("Reading %s: %v", viper.ConfigFileUsed(), err) } - glog.V(logLevel).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) if required { glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ "\n\nPlease use this command to generate the default %s.toml file\n"+ @@ -46,11 +47,55 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { return true } -func GetViper() *viper.Viper { - v := &viper.Viper{} - *v = *viper.GetViper() - v.AutomaticEnv() - v.SetEnvPrefix("weed") - v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) - return v +type ViperProxy struct { + *viper.Viper + sync.Mutex +} + +var ( + vp = &ViperProxy{} +) + +func (vp *ViperProxy) SetDefault(key string, value interface{}) { + vp.Lock() + defer vp.Unlock() + vp.Viper.SetDefault(key, value) +} + +func (vp *ViperProxy) GetString(key string) string { + vp.Lock() + defer vp.Unlock() + return vp.Viper.GetString(key) +} + +func (vp *ViperProxy) GetBool(key string) bool { + vp.Lock() + defer vp.Unlock() + return vp.Viper.GetBool(key) +} + +func (vp *ViperProxy) GetInt(key string) int { + vp.Lock() + defer vp.Unlock() + return vp.Viper.GetInt(key) +} + +func (vp *ViperProxy) GetStringSlice(key string) []string { + vp.Lock() + defer vp.Unlock() + return vp.Viper.GetStringSlice(key) +} + +func GetViper() *ViperProxy { + vp.Lock() + defer vp.Unlock() + + if vp.Viper == nil { + vp.Viper = viper.GetViper() + vp.AutomaticEnv() + vp.SetEnvPrefix("weed") + vp.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + } + + return vp } diff --git a/weed/util/constants.go b/weed/util/constants.go index 89155e9a2..6001ae78e 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 16) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 24) COMMIT = "" ) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index e4310b5c5..f84c674ff 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -28,6 +28,7 @@ type LogBuffer struct { pos int startTime time.Time stopTime time.Time + lastFlushTime time.Time sizeBuf []byte flushInterval time.Duration flushFn func(startTime, stopTime time.Time, buf []byte) @@ -129,6 +130,7 @@ func (m *LogBuffer) loopFlush() { // fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes())) m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() + m.lastFlushTime = d.stopTime } } } @@ -174,10 +176,14 @@ func (d *dataToFlush) releaseMemory() { bufferPool.Put(d.data) } -func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) { +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer, err error) { m.RLock() defer m.RUnlock() + if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) { + return nil, ResumeFromDiskError + } + /* fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) for i, prevBuf := range m.prevBuffers.buffers { @@ -186,11 +192,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu */ if lastReadTime.Equal(m.stopTime) { - return nil + return nil, nil } if lastReadTime.After(m.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) - return nil + return nil, nil } if lastReadTime.Before(m.startTime) { // println("checking ", lastReadTime.UnixNano()) @@ -198,19 +204,19 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu if buf.startTime.After(lastReadTime) { if i == 0 { // println("return the earliest in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]) + return copiedBytes(buf.buf[:buf.size]), nil } // println("return the", i, "th in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]) + return copiedBytes(buf.buf[:buf.size]), nil } if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { pos := buf.locateByTs(lastReadTime) // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) - return copiedBytes(buf.buf[pos:buf.size]) + return copiedBytes(buf.buf[pos:buf.size]), nil } } // println("return the current buf", lastReadTime.UnixNano()) - return copiedBytes(m.buf[:m.pos]) + return copiedBytes(m.buf[:m.pos]), nil } lastTs := lastReadTime.UnixNano() @@ -243,7 +249,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu } if prevT <= lastTs { // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) - return copiedBytes(m.buf[pos:m.pos]) + return copiedBytes(m.buf[pos:m.pos]), nil } h = mid } @@ -251,7 +257,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu } // FIXME: this could be that the buffer has been flushed already - return nil + return nil, nil } func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 57f4b0115..d6917abfe 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -13,7 +13,8 @@ import ( ) var ( - ResumeError = fmt.Errorf("resume") + ResumeError = fmt.Errorf("resume") + ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) func (logBuffer *LogBuffer) LoopProcessLogData( @@ -34,7 +35,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData( if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } - bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) + bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) + if err == ResumeFromDiskError { + return lastReadTime, ResumeFromDiskError + } // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) if bytesBuf == nil { if waitForDataFn() { |
