aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go2
-rw-r--r--weed/util/config.go65
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/log_buffer/log_buffer.go24
-rw-r--r--weed/util/log_buffer/log_read.go8
5 files changed, 78 insertions, 23 deletions
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
index 931e45e9a..1eb00e1fa 100644
--- a/weed/util/chunk_cache/chunk_cache_in_memory.go
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -3,7 +3,7 @@ package chunk_cache
import (
"time"
- "github.com/karlseguin/ccache"
+ "github.com/karlseguin/ccache/v2"
)
// a global cache for recently accessed file chunks
diff --git a/weed/util/config.go b/weed/util/config.go
index 94e621e34..ee805f26a 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -2,6 +2,7 @@ package util
import (
"strings"
+ "sync"
"github.com/spf13/viper"
@@ -28,11 +29,11 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
- logLevel := glog.Level(0)
if strings.Contains(err.Error(), "Not Found") {
- logLevel = 1
+ glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
+ } else {
+ glog.Fatalf("Reading %s: %v", viper.ConfigFileUsed(), err)
}
- glog.V(logLevel).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
if required {
glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
"\n\nPlease use this command to generate the default %s.toml file\n"+
@@ -46,11 +47,55 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
return true
}
-func GetViper() *viper.Viper {
- v := &viper.Viper{}
- *v = *viper.GetViper()
- v.AutomaticEnv()
- v.SetEnvPrefix("weed")
- v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
- return v
+type ViperProxy struct {
+ *viper.Viper
+ sync.Mutex
+}
+
+var (
+ vp = &ViperProxy{}
+)
+
+func (vp *ViperProxy) SetDefault(key string, value interface{}) {
+ vp.Lock()
+ defer vp.Unlock()
+ vp.Viper.SetDefault(key, value)
+}
+
+func (vp *ViperProxy) GetString(key string) string {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetString(key)
+}
+
+func (vp *ViperProxy) GetBool(key string) bool {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetBool(key)
+}
+
+func (vp *ViperProxy) GetInt(key string) int {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetInt(key)
+}
+
+func (vp *ViperProxy) GetStringSlice(key string) []string {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetStringSlice(key)
+}
+
+func GetViper() *ViperProxy {
+ vp.Lock()
+ defer vp.Unlock()
+
+ if vp.Viper == nil {
+ vp.Viper = viper.GetViper()
+ vp.AutomaticEnv()
+ vp.SetEnvPrefix("weed")
+ vp.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+ }
+
+ return vp
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 95370746b..4e6a28334 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 17)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 21)
COMMIT = ""
)
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index e4310b5c5..f84c674ff 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -28,6 +28,7 @@ type LogBuffer struct {
pos int
startTime time.Time
stopTime time.Time
+ lastFlushTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn func(startTime, stopTime time.Time, buf []byte)
@@ -129,6 +130,7 @@ func (m *LogBuffer) loopFlush() {
// fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes()))
m.flushFn(d.startTime, d.stopTime, d.data.Bytes())
d.releaseMemory()
+ m.lastFlushTime = d.stopTime
}
}
}
@@ -174,10 +176,14 @@ func (d *dataToFlush) releaseMemory() {
bufferPool.Put(d.data)
}
-func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) {
+func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer, err error) {
m.RLock()
defer m.RUnlock()
+ if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
+ return nil, ResumeFromDiskError
+ }
+
/*
fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers))
for i, prevBuf := range m.prevBuffers.buffers {
@@ -186,11 +192,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
*/
if lastReadTime.Equal(m.stopTime) {
- return nil
+ return nil, nil
}
if lastReadTime.After(m.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
- return nil
+ return nil, nil
}
if lastReadTime.Before(m.startTime) {
// println("checking ", lastReadTime.UnixNano())
@@ -198,19 +204,19 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
if buf.startTime.After(lastReadTime) {
if i == 0 {
// println("return the earliest in memory", buf.startTime.UnixNano())
- return copiedBytes(buf.buf[:buf.size])
+ return copiedBytes(buf.buf[:buf.size]), nil
}
// println("return the", i, "th in memory", buf.startTime.UnixNano())
- return copiedBytes(buf.buf[:buf.size])
+ return copiedBytes(buf.buf[:buf.size]), nil
}
if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) {
pos := buf.locateByTs(lastReadTime)
// fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
- return copiedBytes(buf.buf[pos:buf.size])
+ return copiedBytes(buf.buf[pos:buf.size]), nil
}
}
// println("return the current buf", lastReadTime.UnixNano())
- return copiedBytes(m.buf[:m.pos])
+ return copiedBytes(m.buf[:m.pos]), nil
}
lastTs := lastReadTime.UnixNano()
@@ -243,7 +249,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
}
if prevT <= lastTs {
// fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
- return copiedBytes(m.buf[pos:m.pos])
+ return copiedBytes(m.buf[pos:m.pos]), nil
}
h = mid
}
@@ -251,7 +257,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
}
// FIXME: this could be that the buffer has been flushed already
- return nil
+ return nil, nil
}
func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 57f4b0115..d6917abfe 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -13,7 +13,8 @@ import (
)
var (
- ResumeError = fmt.Errorf("resume")
+ ResumeError = fmt.Errorf("resume")
+ ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
)
func (logBuffer *LogBuffer) LoopProcessLogData(
@@ -34,7 +35,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
- bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
+ bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
+ if err == ResumeFromDiskError {
+ return lastReadTime, ResumeFromDiskError
+ }
// fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
if bytesBuf == nil {
if waitForDataFn() {