diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-08-31 18:02:11 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-08-31 18:02:11 +0800 |
| commit | 44a56b158e4637bd70d3fcf8ddc9107973b60558 (patch) | |
| tree | 4cf59d290d346c6ea06d617531c90d2653f3bc03 /weed/util | |
| parent | b0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (diff) | |
| parent | 408e339c53b9b6626e81f1c3f0f2399494bf4ce6 (diff) | |
| download | seaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.tar.xz seaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.zip | |
Merge pull request #13 from chrislusf/master
sync
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/bounded_tree/bounded_tree.go | 5 | ||||
| -rw-r--r-- | weed/util/bytes.go | 7 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 2 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/util/http_util.go | 1 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 23 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 4 |
8 files changed, 31 insertions, 15 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go index 40b9c4e47..0e023c0d1 100644 --- a/weed/util/bounded_tree/bounded_tree.go +++ b/weed/util/bounded_tree/bounded_tree.go @@ -15,7 +15,7 @@ type Node struct { type BoundedTree struct { root *Node - sync.Mutex + sync.RWMutex } func NewBoundedTree() *BoundedTree { @@ -131,6 +131,9 @@ func (n *Node) getChild(childName string) *Node { func (t *BoundedTree) HasVisited(p util.FullPath) bool { + t.RLock() + defer t.RUnlock() + if t.root == nil { return true } diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 890d50586..82e4ddeef 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,6 +2,7 @@ package util import ( "crypto/md5" + "crypto/rand" "encoding/base64" "fmt" "io" @@ -135,3 +136,9 @@ func Base64Md5ToBytes(contentMd5 string) []byte { } return data } + +func RandomInt32() int32 { + buf := make([]byte, 4) + rand.Read(buf) + return int32(BytesToUint32(buf)) +} diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index a1a054215..2b0c635a1 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -94,7 +94,7 @@ func (c *TieredChunkCache) SetChunk(fileId string, data []byte) { c.Lock() defer c.Unlock() - glog.V(5).Infof("SetChunk %s size %d\n", fileId, len(data)) + glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data)) c.doSetChunk(fileId, data) } diff --git a/weed/util/constants.go b/weed/util/constants.go index e614a5d24..d48b9e32d 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 90) + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 92) COMMIT = "" ) diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 5159fcd17..7cc64ea85 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -368,6 +368,7 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e if err != nil { return nil, err } + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index cb9565fb2..e4310b5c5 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { m.Lock() defer func() { @@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { }() // need to put the timestamp inside the lock - ts := time.Now() - tsNs := ts.UnixNano() - if m.lastTsNs >= tsNs { + var ts time.Time + if eventTsNs == 0 { + ts = time.Now() + eventTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, eventTsNs) + } + if m.lastTsNs >= eventTsNs { // this is unlikely to happen, but just in case - tsNs = m.lastTsNs + 1 - ts = time.Unix(0, tsNs) + eventTsNs = m.lastTsNs + 1 + ts = time.Unix(0, eventTsNs) } - m.lastTsNs = tsNs + m.lastTsNs = eventTsNs logEntry := &filer_pb.LogEntry{ - TsNs: tsNs, + TsNs: eventTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } @@ -249,7 +254,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return nil } -func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { +func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index f9ccc95c2..3d77afb18 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf) + lb.AddToBuffer(nil, buf, 0) } receivedmessageCount := 0 diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 2b73a8064..f0486ac46 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -20,14 +20,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData( lastReadTime := startTreadTime defer func() { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } }() for { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) |
