aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go1
-rw-r--r--weed/operation/buffer_pool.go24
-rw-r--r--weed/operation/upload_content.go16
3 files changed, 35 insertions, 6 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index cc1359961..ffd3e4938 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -86,6 +86,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
+ break
}
return ret, lastError
diff --git a/weed/operation/buffer_pool.go b/weed/operation/buffer_pool.go
new file mode 100644
index 000000000..9cbe4787f
--- /dev/null
+++ b/weed/operation/buffer_pool.go
@@ -0,0 +1,24 @@
+package operation
+
+import (
+ "github.com/valyala/bytebufferpool"
+ "sync/atomic"
+)
+
+var bufferCounter int64
+
+func GetBuffer() *bytebufferpool.ByteBuffer {
+ defer func() {
+ atomic.AddInt64(&bufferCounter, 1)
+ // println("+", bufferCounter)
+ }()
+ return bytebufferpool.Get()
+}
+
+func PutBuffer(buf *bytebufferpool.ByteBuffer) {
+ defer func() {
+ atomic.AddInt64(&bufferCounter, -1)
+ // println("-", bufferCounter)
+ }()
+ bytebufferpool.Put(buf)
+}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 70428bb07..944186eeb 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -11,7 +11,6 @@ import (
"net/http"
"net/textproto"
"path/filepath"
- "runtime/debug"
"strings"
"time"
@@ -19,7 +18,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/valyala/bytebufferpool"
)
type UploadResult struct {
@@ -31,6 +29,7 @@ type UploadResult struct {
Mime string `json:"mime,omitempty"`
Gzip uint32 `json:"gzip,omitempty"`
ContentMd5 string `json:"contentMd5,omitempty"`
+ RetryCount int `json:"-"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
@@ -96,6 +95,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by
for i := 0; i < 3; i++ {
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
if err == nil {
+ uploadResult.RetryCount = i
return
} else {
glog.Warningf("uploading to %s: %v", uploadUrl, err)
@@ -188,8 +188,8 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
}
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
- buf := bytebufferpool.Get()
- defer bytebufferpool.Put(buf)
+ buf := GetBuffer()
+ defer PutBuffer(buf)
body_writer := multipart.NewWriter(buf)
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
@@ -234,8 +234,12 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
// print("+")
resp, post_err := HttpClient.Do(req)
if post_err != nil {
- glog.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
- debug.PrintStack()
+ if strings.Contains(post_err.Error(), "connection reset by peer") ||
+ strings.Contains(post_err.Error(), "use of closed network connection") {
+ resp, post_err = HttpClient.Do(req)
+ }
+ }
+ if post_err != nil {
return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
}
// print("-")