aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-08-31 18:02:11 +0800
committerGitHub <noreply@github.com>2020-08-31 18:02:11 +0800
commit44a56b158e4637bd70d3fcf8ddc9107973b60558 (patch)
tree4cf59d290d346c6ea06d617531c90d2653f3bc03 /weed/util
parentb0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (diff)
parent408e339c53b9b6626e81f1c3f0f2399494bf4ce6 (diff)
downloadseaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.tar.xz
seaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.zip
Merge pull request #13 from chrislusf/master
sync
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/bounded_tree/bounded_tree.go5
-rw-r--r--weed/util/bytes.go7
-rw-r--r--weed/util/chunk_cache/chunk_cache.go2
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/http_util.go1
-rw-r--r--weed/util/log_buffer/log_buffer.go23
-rw-r--r--weed/util/log_buffer/log_buffer_test.go2
-rw-r--r--weed/util/log_buffer/log_read.go4
8 files changed, 31 insertions, 15 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go
index 40b9c4e47..0e023c0d1 100644
--- a/weed/util/bounded_tree/bounded_tree.go
+++ b/weed/util/bounded_tree/bounded_tree.go
@@ -15,7 +15,7 @@ type Node struct {
type BoundedTree struct {
root *Node
- sync.Mutex
+ sync.RWMutex
}
func NewBoundedTree() *BoundedTree {
@@ -131,6 +131,9 @@ func (n *Node) getChild(childName string) *Node {
func (t *BoundedTree) HasVisited(p util.FullPath) bool {
+ t.RLock()
+ defer t.RUnlock()
+
if t.root == nil {
return true
}
diff --git a/weed/util/bytes.go b/weed/util/bytes.go
index 890d50586..82e4ddeef 100644
--- a/weed/util/bytes.go
+++ b/weed/util/bytes.go
@@ -2,6 +2,7 @@ package util
import (
"crypto/md5"
+ "crypto/rand"
"encoding/base64"
"fmt"
"io"
@@ -135,3 +136,9 @@ func Base64Md5ToBytes(contentMd5 string) []byte {
}
return data
}
+
+func RandomInt32() int32 {
+ buf := make([]byte, 4)
+ rand.Read(buf)
+ return int32(BytesToUint32(buf))
+}
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index a1a054215..2b0c635a1 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -94,7 +94,7 @@ func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
c.Lock()
defer c.Unlock()
- glog.V(5).Infof("SetChunk %s size %d\n", fileId, len(data))
+ glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
c.doSetChunk(fileId, data)
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index e614a5d24..d48b9e32d 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 90)
+ VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 92)
COMMIT = ""
)
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 5159fcd17..7cc64ea85 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -368,6 +368,7 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
if err != nil {
return nil, err
}
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index cb9565fb2..e4310b5c5 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb
}
-func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
m.Lock()
defer func() {
@@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}()
// need to put the timestamp inside the lock
- ts := time.Now()
- tsNs := ts.UnixNano()
- if m.lastTsNs >= tsNs {
+ var ts time.Time
+ if eventTsNs == 0 {
+ ts = time.Now()
+ eventTsNs = ts.UnixNano()
+ } else {
+ ts = time.Unix(0, eventTsNs)
+ }
+ if m.lastTsNs >= eventTsNs {
// this is unlikely to happen, but just in case
- tsNs = m.lastTsNs + 1
- ts = time.Unix(0, tsNs)
+ eventTsNs = m.lastTsNs + 1
+ ts = time.Unix(0, eventTsNs)
}
- m.lastTsNs = tsNs
+ m.lastTsNs = eventTsNs
logEntry := &filer_pb.LogEntry{
- TsNs: tsNs,
+ TsNs: eventTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -249,7 +254,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
return nil
}
-func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) {
+func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b)
}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index f9ccc95c2..3d77afb18 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
- lb.AddToBuffer(nil, buf)
+ lb.AddToBuffer(nil, buf, 0)
}
receivedmessageCount := 0
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 2b73a8064..f0486ac46 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -20,14 +20,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
lastReadTime := startTreadTime
defer func() {
if bytesBuf != nil {
- logBuffer.ReleaseMeory(bytesBuf)
+ logBuffer.ReleaseMemory(bytesBuf)
}
}()
for {
if bytesBuf != nil {
- logBuffer.ReleaseMeory(bytesBuf)
+ logBuffer.ReleaseMemory(bytesBuf)
}
bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
// fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)