aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/bounded_tree/bounded_tree.go40
-rw-r--r--weed/util/bounded_tree/bounded_tree_test.go4
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/http_util.go38
-rw-r--r--weed/util/net_timeout.go3
5 files changed, 54 insertions, 33 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go
index 0e023c0d1..0e8af2520 100644
--- a/weed/util/bounded_tree/bounded_tree.go
+++ b/weed/util/bounded_tree/bounded_tree.go
@@ -16,13 +16,15 @@ type Node struct {
type BoundedTree struct {
root *Node
sync.RWMutex
+ baseDir util.FullPath
}
-func NewBoundedTree() *BoundedTree {
+func NewBoundedTree(baseDir util.FullPath) *BoundedTree {
return &BoundedTree{
root: &Node{
Name: "/",
},
+ baseDir: baseDir,
}
}
@@ -32,21 +34,29 @@ type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err erro
// No action if the directory has been visited before or does not exist.
// A leaf node, which has no children, represents a directory not visited.
// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit.
-func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) {
+func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) {
t.Lock()
defer t.Unlock()
if t.root == nil {
return
}
+ if t.baseDir != "/" {
+ p = p[len(t.baseDir):]
+ }
components := p.Split()
// fmt.Printf("components %v %d\n", components, len(components))
- if canDelete := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn); canDelete {
+ canDelete, err := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn)
+ if err != nil {
+ return err
+ }
+ if canDelete {
t.root = nil
}
+ return nil
}
-func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool) {
+func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) {
// println("ensureVisited", currentPath, i)
@@ -60,15 +70,20 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen
} else {
// fmt.Printf("ensure %v\n", currentPath)
- children, err := visitFn(currentPath)
+ filerPath := currentPath
+ if t.baseDir != "/" {
+ filerPath = t.baseDir + filerPath
+ }
+
+ children, err := visitFn(filerPath)
if err != nil {
glog.V(0).Infof("failed to visit %s: %v", currentPath, err)
- return
+ return false, err
}
if len(children) == 0 {
// fmt.Printf(" canDelete %v without children\n", currentPath)
- return true
+ return true, nil
}
n.Children = make(map[string]*Node)
@@ -93,19 +108,22 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen
}
// fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name)
-
- if canDelete := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn); canDelete {
+ canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn)
+ if childVisitErr != nil {
+ return false, childVisitErr
+ }
+ if canDelete {
// fmt.Printf(" delete %v %v\n", currentPath, components[i])
delete(n.Children, components[i])
if len(n.Children) == 0 {
// fmt.Printf(" canDelete %v\n", currentPath)
- return true
+ return true, nil
}
}
- return false
+ return false, nil
}
diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go
index 0b9c3177a..465f1cc9c 100644
--- a/weed/util/bounded_tree/bounded_tree_test.go
+++ b/weed/util/bounded_tree/bounded_tree_test.go
@@ -52,7 +52,7 @@ func TestBoundedTree(t *testing.T) {
// g
// h
- tree := NewBoundedTree()
+ tree := NewBoundedTree(util.FullPath("/"))
tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn)
@@ -100,7 +100,7 @@ func TestEmptyBoundedTree(t *testing.T) {
// g
// h
- tree := NewBoundedTree()
+ tree := NewBoundedTree(util.FullPath("/"))
visitFn := func(path util.FullPath) (childDirectories []string, err error) {
fmt.Printf(" visit %v ...\n", path)
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 0f3fd52c7..3b222fbaa 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 03)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 04)
COMMIT = ""
)
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index eef24b930..da0b3d849 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -67,14 +67,14 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout
-func Get(url string) ([]byte, error) {
+func Get(url string) ([]byte, bool, error) {
request, err := http.NewRequest("GET", url, nil)
request.Header.Add("Accept-Encoding", "gzip")
response, err := client.Do(request)
if err != nil {
- return nil, err
+ return nil, true, err
}
defer response.Body.Close()
@@ -89,12 +89,13 @@ func Get(url string) ([]byte, error) {
b, err := ioutil.ReadAll(reader)
if response.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, response.Status)
+ retryable := response.StatusCode >= 500
+ return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
}
if err != nil {
- return nil, err
+ return nil, false, err
}
- return b, nil
+ return b, false, nil
}
func Head(url string) (http.Header, error) {
@@ -207,7 +208,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
if cipherKey != nil {
var n int
- err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data)
})
return int64(n), err
@@ -272,7 +273,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err
}
-func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
@@ -280,7 +281,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
- return err
+ return false, err
}
if isFullChunk {
@@ -291,11 +292,12 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
r, err := client.Do(req)
if err != nil {
- return err
+ return true, err
}
defer CloseResponse(r)
if r.StatusCode >= 400 {
- return fmt.Errorf("%s: %s", fileUrl, r.Status)
+ retryable = r.StatusCode >= 500
+ return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var reader io.ReadCloser
@@ -317,23 +319,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
m, err = reader.Read(buf)
fn(buf[:m])
if err == io.EOF {
- return nil
+ return false, nil
}
if err != nil {
- return err
+ return false, err
}
}
}
-func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
- encryptedData, err := Get(fileUrl)
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
+ encryptedData, retryable, err := Get(fileUrl)
if err != nil {
- return fmt.Errorf("fetch %s: %v", fileUrl, err)
+ return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}
decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
if err != nil {
- return fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
}
if isContentCompressed {
decryptedData, err = DecompressData(decryptedData)
@@ -342,14 +344,14 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
}
}
if len(decryptedData) < int(offset)+size {
- return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
}
if isFullChunk {
fn(decryptedData)
} else {
fn(decryptedData[int(offset) : int(offset)+size])
}
- return nil
+ return false, nil
}
func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go
index f057a8f5b..e8075c297 100644
--- a/weed/util/net_timeout.go
+++ b/weed/util/net_timeout.go
@@ -54,7 +54,8 @@ func (c *Conn) Read(b []byte) (count int, e error) {
func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
- err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
+ // minimum 4KB/s
+ err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(len(b)/40000+1)))
if err != nil {
return 0, err
}