aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/bounded_tree/bounded_tree.go5
-rw-r--r--weed/util/bytes.go32
-rw-r--r--weed/util/chunk_cache/chunk_cache.go37
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go4
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go4
-rw-r--r--weed/util/compression.go37
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/http_util.go61
-rw-r--r--weed/util/log_buffer/log_buffer.go23
-rw-r--r--weed/util/log_buffer/log_buffer_test.go2
-rw-r--r--weed/util/log_buffer/log_read.go20
11 files changed, 172 insertions, 55 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go
index 40b9c4e47..0e023c0d1 100644
--- a/weed/util/bounded_tree/bounded_tree.go
+++ b/weed/util/bounded_tree/bounded_tree.go
@@ -15,7 +15,7 @@ type Node struct {
type BoundedTree struct {
root *Node
- sync.Mutex
+ sync.RWMutex
}
func NewBoundedTree() *BoundedTree {
@@ -131,6 +131,9 @@ func (n *Node) getChild(childName string) *Node {
func (t *BoundedTree) HasVisited(p util.FullPath) bool {
+ t.RLock()
+ defer t.RUnlock()
+
if t.root == nil {
return true
}
diff --git a/weed/util/bytes.go b/weed/util/bytes.go
index 0650919c0..82e4ddeef 100644
--- a/weed/util/bytes.go
+++ b/weed/util/bytes.go
@@ -2,6 +2,8 @@ package util
import (
"crypto/md5"
+ "crypto/rand"
+ "encoding/base64"
"fmt"
"io"
)
@@ -109,8 +111,34 @@ func HashToInt32(data []byte) (v int32) {
return
}
-func Md5(data []byte) string {
+func Base64Encode(data []byte) string {
+ return base64.StdEncoding.EncodeToString(data)
+}
+
+func Base64Md5(data []byte) string {
+ return Base64Encode(Md5(data))
+}
+
+func Md5(data []byte) []byte {
hash := md5.New()
hash.Write(data)
- return fmt.Sprintf("%x", hash.Sum(nil))
+ return hash.Sum(nil)
+}
+
+func Md5String(data []byte) string {
+ return fmt.Sprintf("%x", Md5(data))
+}
+
+func Base64Md5ToBytes(contentMd5 string) []byte {
+ data, err := base64.StdEncoding.DecodeString(contentMd5)
+ if err != nil {
+ return nil
+ }
+ return data
+}
+
+func RandomInt32() int32 {
+ buf := make([]byte, 4)
+ rand.Read(buf)
+ return int32(BytesToUint32(buf))
}
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index 17b64fb6c..2b0c635a1 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -13,16 +13,21 @@ const (
onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit
)
+type ChunkCache interface {
+ GetChunk(fileId string, minSize uint64) (data []byte)
+ SetChunk(fileId string, data []byte)
+}
+
// a global cache for recently accessed file chunks
-type ChunkCache struct {
+type TieredChunkCache struct {
memCache *ChunkCacheInMemory
diskCaches []*OnDiskCacheLayer
sync.RWMutex
}
-func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache {
+func NewTieredChunkCache(maxEntries int64, dir string, diskSizeMB int64) *TieredChunkCache {
- c := &ChunkCache{
+ c := &TieredChunkCache{
memCache: NewChunkCacheInMemory(maxEntries),
}
c.diskCaches = make([]*OnDiskCacheLayer, 3)
@@ -33,7 +38,7 @@ func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache {
return c
}
-func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) {
+func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
if c == nil {
return
}
@@ -41,14 +46,14 @@ func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) {
c.RLock()
defer c.RUnlock()
- return c.doGetChunk(fileId, chunkSize)
+ return c.doGetChunk(fileId, minSize)
}
-func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) {
+func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) {
- if chunkSize < memCacheSizeLimit {
+ if minSize < memCacheSizeLimit {
data = c.memCache.GetChunk(fileId)
- if len(data) >= int(chunkSize) {
+ if len(data) >= int(minSize) {
return data
}
}
@@ -59,21 +64,21 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) {
return nil
}
- if chunkSize < onDiskCacheSizeLimit0 {
+ if minSize < onDiskCacheSizeLimit0 {
data = c.diskCaches[0].getChunk(fid.Key)
- if len(data) >= int(chunkSize) {
+ if len(data) >= int(minSize) {
return data
}
}
- if chunkSize < onDiskCacheSizeLimit1 {
+ if minSize < onDiskCacheSizeLimit1 {
data = c.diskCaches[1].getChunk(fid.Key)
- if len(data) >= int(chunkSize) {
+ if len(data) >= int(minSize) {
return data
}
}
{
data = c.diskCaches[2].getChunk(fid.Key)
- if len(data) >= int(chunkSize) {
+ if len(data) >= int(minSize) {
return data
}
}
@@ -82,7 +87,7 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) {
}
-func (c *ChunkCache) SetChunk(fileId string, data []byte) {
+func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
if c == nil {
return
}
@@ -94,7 +99,7 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) {
c.doSetChunk(fileId, data)
}
-func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
+func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
if len(data) < memCacheSizeLimit {
c.memCache.SetChunk(fileId, data)
@@ -116,7 +121,7 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
}
-func (c *ChunkCache) Shutdown() {
+func (c *TieredChunkCache) Shutdown() {
if c == nil {
return
}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
index d74f87b0c..356dfe188 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -63,7 +63,7 @@ func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCac
return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err)
}
- glog.V(0).Infoln("loading leveldb", v.fileName+".ldb")
+ glog.V(1).Infoln("loading leveldb", v.fileName+".ldb")
opts := &opt.Options{
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
@@ -137,7 +137,7 @@ func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
v.fileSize += int64(types.NeedlePaddingSize - extraSize)
}
- if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil {
+ if err := v.nm.Put(key, types.ToOffset(offset), types.Size(len(data))); err != nil {
return err
}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
index f061f2ba2..558488f18 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -16,7 +16,7 @@ func TestOnDisk(t *testing.T) {
totalDiskSizeMb := int64(32)
- cache := NewChunkCache(0, tmpDir, totalDiskSizeMb)
+ cache := NewTieredChunkCache(0, tmpDir, totalDiskSizeMb)
writeCount := 5
type test_data struct {
@@ -45,7 +45,7 @@ func TestOnDisk(t *testing.T) {
cache.Shutdown()
- cache = NewChunkCache(0, tmpDir, totalDiskSizeMb)
+ cache = NewTieredChunkCache(0, tmpDir, totalDiskSizeMb)
for i := 0; i < writeCount; i++ {
data := cache.GetChunk(testData[i].fileId, testData[i].size)
diff --git a/weed/util/compression.go b/weed/util/compression.go
index 4488e019e..cf3ac7c57 100644
--- a/weed/util/compression.go
+++ b/weed/util/compression.go
@@ -12,15 +12,44 @@ import (
"github.com/klauspost/compress/zstd"
)
+var (
+ UnsupportedCompression = fmt.Errorf("unsupported compression")
+)
+
+func MaybeGzipData(input []byte) []byte {
+ if IsGzippedContent(input) {
+ return input
+ }
+ gzipped, err := GzipData(input)
+ if err != nil {
+ return input
+ }
+ if len(gzipped)*10 > len(input)*9 {
+ return input
+ }
+ return gzipped
+}
+
+func MaybeDecompressData(input []byte) []byte {
+ uncompressed, err := DecompressData(input)
+ if err != nil {
+ if err != UnsupportedCompression {
+ glog.Errorf("decompressed data: %v", err)
+ }
+ return input
+ }
+ return uncompressed
+}
+
func GzipData(input []byte) ([]byte, error) {
buf := new(bytes.Buffer)
w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed)
if _, err := w.Write(input); err != nil {
- glog.V(2).Infoln("error compressing data:", err)
+ glog.V(2).Infof("error gzip data: %v", err)
return nil, err
}
if err := w.Close(); err != nil {
- glog.V(2).Infoln("error closing compressed data:", err)
+ glog.V(2).Infof("error closing gzipped data: %v", err)
return nil, err
}
return buf.Bytes(), nil
@@ -39,7 +68,7 @@ func DecompressData(input []byte) ([]byte, error) {
if IsZstdContent(input) {
return unzstdData(input)
}
- return nil, fmt.Errorf("unsupported compression")
+ return input, UnsupportedCompression
}
func ungzipData(input []byte) ([]byte, error) {
@@ -48,7 +77,7 @@ func ungzipData(input []byte) ([]byte, error) {
defer r.Close()
output, err := ioutil.ReadAll(r)
if err != nil {
- glog.V(2).Infoln("error uncompressing data:", err)
+ glog.V(2).Infof("error ungzip data: %v", err)
}
return output, err
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 9ee9758e6..6734af7d4 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 86)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 00)
COMMIT = ""
)
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index c67eb3276..eef24b930 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -68,14 +68,28 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout
func Get(url string) ([]byte, error) {
- r, err := client.Get(url)
+
+ request, err := http.NewRequest("GET", url, nil)
+ request.Header.Add("Accept-Encoding", "gzip")
+
+ response, err := client.Do(request)
if err != nil {
return nil, err
}
- defer r.Body.Close()
- b, err := ioutil.ReadAll(r.Body)
- if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, r.Status)
+ defer response.Body.Close()
+
+ var reader io.ReadCloser
+ switch response.Header.Get("Content-Encoding") {
+ case "gzip":
+ reader, err = gzip.NewReader(response.Body)
+ defer reader.Close()
+ default:
+ reader = response.Body
+ }
+
+ b, err := ioutil.ReadAll(reader)
+ if response.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", url, response.Status)
}
if err != nil {
return nil, err
@@ -160,7 +174,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
return readFn(r.Body)
}
-func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) {
+func DownloadFile(fileUrl string) (filename string, header http.Header, resp *http.Response, e error) {
response, err := client.Get(fileUrl)
if err != nil {
return "", nil, nil, err
@@ -174,7 +188,7 @@ func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.Re
filename = strings.Trim(filename, "\"")
}
}
- rc = response.Body
+ resp = response
return
}
@@ -269,7 +283,9 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
return err
}
- if !isFullChunk {
+ if isFullChunk {
+ req.Header.Add("Accept-Encoding", "gzip")
+ } else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
@@ -282,13 +298,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
return fmt.Errorf("%s: %s", fileUrl, r.Status)
}
+ var reader io.ReadCloser
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
+ case "gzip":
+ reader, err = gzip.NewReader(r.Body)
+ defer reader.Close()
+ default:
+ reader = r.Body
+ }
+
var (
m int
)
buf := make([]byte, 64*1024)
for {
- m, err = r.Body.Read(buf)
+ m, err = reader.Read(buf)
fn(buf[:m])
if err == io.EOF {
return nil
@@ -312,7 +338,7 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
if isContentCompressed {
decryptedData, err = DecompressData(decryptedData)
if err != nil {
- return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err)
+ glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
}
}
if len(decryptedData) < int(offset)+size {
@@ -334,17 +360,30 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
}
if rangeHeader != "" {
req.Header.Add("Range", rangeHeader)
+ } else {
+ req.Header.Add("Accept-Encoding", "gzip")
}
r, err := client.Do(req)
if err != nil {
return nil, err
}
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
- return r.Body, nil
+ var reader io.ReadCloser
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
+ case "gzip":
+ reader, err = gzip.NewReader(r.Body)
+ defer reader.Close()
+ default:
+ reader = r.Body
+ }
+
+ return reader, nil
}
func CloseResponse(resp *http.Response) {
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index cb9565fb2..e4310b5c5 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb
}
-func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
m.Lock()
defer func() {
@@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}()
// need to put the timestamp inside the lock
- ts := time.Now()
- tsNs := ts.UnixNano()
- if m.lastTsNs >= tsNs {
+ var ts time.Time
+ if eventTsNs == 0 {
+ ts = time.Now()
+ eventTsNs = ts.UnixNano()
+ } else {
+ ts = time.Unix(0, eventTsNs)
+ }
+ if m.lastTsNs >= eventTsNs {
// this is unlikely to happen, but just in case
- tsNs = m.lastTsNs + 1
- ts = time.Unix(0, tsNs)
+ eventTsNs = m.lastTsNs + 1
+ ts = time.Unix(0, eventTsNs)
}
- m.lastTsNs = tsNs
+ m.lastTsNs = eventTsNs
logEntry := &filer_pb.LogEntry{
- TsNs: tsNs,
+ TsNs: eventTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -249,7 +254,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
return nil
}
-func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) {
+func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b)
}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index f9ccc95c2..3d77afb18 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
- lb.AddToBuffer(nil, buf)
+ lb.AddToBuffer(nil, buf, 0)
}
receivedmessageCount := 0
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 2b73a8064..57f4b0115 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -2,6 +2,7 @@ package log_buffer
import (
"bytes"
+ "fmt"
"time"
"github.com/golang/protobuf/proto"
@@ -11,23 +12,27 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+var (
+ ResumeError = fmt.Errorf("resume")
+)
+
func (logBuffer *LogBuffer) LoopProcessLogData(
startTreadTime time.Time,
waitForDataFn func() bool,
- eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
- lastReadTime := startTreadTime
+ lastReadTime = startTreadTime
defer func() {
if bytesBuf != nil {
- logBuffer.ReleaseMeory(bytesBuf)
+ logBuffer.ReleaseMemory(bytesBuf)
}
}()
for {
if bytesBuf != nil {
- logBuffer.ReleaseMeory(bytesBuf)
+ logBuffer.ReleaseMemory(bytesBuf)
}
bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
// fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
@@ -48,10 +53,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = ResumeError
+ glog.Errorf("LoopProcessLogData: read buffer %v read %d [%d,%d) from [0,%d)", lastReadTime, batchSize, pos, pos+int(size)+4, len(buf))
+ return
+ }
entryData := buf[pos+4 : pos+4+int(size)]
- // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf))
-
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)