aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-06 10:04:17 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-06 10:04:17 -0700
commit20e2ac1add0e93710d54f41c8ba142918c60d620 (patch)
tree4fadb3236551f8aab1037bc6f6ec5351f169fcea
parent93ea0801ea65375c16f148c9e77056e6c145f770 (diff)
downloadseaweedfs-20e2ac1add0e93710d54f41c8ba142918c60d620.tar.xz
seaweedfs-20e2ac1add0e93710d54f41c8ba142918c60d620.zip
filer: store md5 metadata for files uploaded by filer
fix https://github.com/chrislusf/seaweedfs/issues/1412
-rw-r--r--weed/filesys/filehandle.go9
-rw-r--r--weed/filesys/wfs.go2
-rw-r--r--weed/operation/upload_content.go26
-rw-r--r--weed/server/filer_server_handlers_write.go21
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go1
-rw-r--r--weed/storage/needle/needle.go3
-rw-r--r--weed/util/bytes.go17
7 files changed, 41 insertions, 38 deletions
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index ca35bfd02..b9d224fb2 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -9,11 +9,12 @@ import (
"os"
"time"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
type FileHandle struct {
@@ -225,6 +226,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil
+ // special handling of one chunk md5
+ if len(chunks) == 1 {
+ }
+
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 68ad987be..9ef597024 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -82,7 +82,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
},
},
}
- cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
+ cacheUniqueId := util.Base64Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, 0755)
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index cb129daa2..6fd8a60d1 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,7 +2,6 @@ package operation
import (
"bytes"
- "crypto/md5"
"encoding/json"
"errors"
"fmt"
@@ -23,14 +22,14 @@ import (
)
type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
- ETag string `json:"eTag,omitempty"`
- CipherKey []byte `json:"cipherKey,omitempty"`
- Mime string `json:"mime,omitempty"`
- Gzip uint32 `json:"gzip,omitempty"`
- Md5 string `json:"md5,omitempty"`
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+ ETag string `json:"eTag,omitempty"`
+ CipherKey []byte `json:"cipherKey,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Gzip uint32 `json:"gzip,omitempty"`
+ ContentMd5 string `json:"contentMd5,omitempty"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
@@ -65,20 +64,12 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
- if uploadResult != nil {
- uploadResult.Md5 = util.Md5(data)
- }
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
- hash := md5.New()
- reader = io.TeeReader(reader, hash)
uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
- if uploadResult != nil {
- uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
- }
return
}
@@ -241,6 +232,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
return nil, errors.New(ret.Error)
}
ret.ETag = etag
+ ret.ContentMd5 = resp.Header.Get("Content-MD5")
return &ret, nil
}
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index da66178ce..c7833a85e 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -2,7 +2,6 @@ package weed_server
import (
"context"
- "crypto/md5"
"encoding/json"
"errors"
"fmt"
@@ -124,12 +123,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
u, _ := url.Parse(urlLocation)
- ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
+ ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
if err != nil {
return
}
- if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil {
+ if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil {
return
}
@@ -147,7 +146,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// update metadata in filer store
func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string,
- collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) {
+ collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) {
stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
start := time.Now()
@@ -188,7 +187,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
Collection: collection,
TtlSec: ttlSeconds,
Mime: ret.Mime,
- Md5: md5value,
+ Md5: util.Base64Md5ToBytes(ret.ContentMd5),
},
Chunks: []*filer_pb.FileChunk{{
FileId: fileId,
@@ -215,7 +214,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
}
// send request to volume server
-func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) {
+func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) {
stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
start := time.Now()
@@ -223,12 +222,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
ret = &operation.UploadResult{}
- md5Hash := md5.New()
body := r.Body
- if r.Method == "PUT" {
- // only PUT or large chunked files has Md5 in attributes
- body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash))
- }
request := &http.Request{
Method: r.Method,
@@ -292,11 +286,8 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
return
}
}
- // use filer calculated md5 ETag, instead of the volume server crc ETag
- if r.Method == "PUT" {
- md5value = md5Hash.Sum(nil)
- }
ret.ETag = getEtag(resp)
+ ret.ContentMd5 = resp.Header.Get("Content-MD5")
return
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 8413496b8..6ec06d3de 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Collection: collection,
TtlSec: ttlSeconds,
Mime: pu.MimeType,
+ Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},
Chunks: fileChunks,
}
diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go
index eb1d9537b..7c7aa3feb 100644
--- a/weed/storage/needle/needle.go
+++ b/weed/storage/needle/needle.go
@@ -48,7 +48,7 @@ func (n *Needle) String() (str string) {
return
}
-func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, md5 string, e error) {
+func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) {
n = new(Needle)
pu, e := ParseUpload(r, sizeLimit)
if e != nil {
@@ -58,6 +58,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit
originalSize = pu.OriginalDataSize
n.LastModified = pu.ModifiedTime
n.Ttl = pu.Ttl
+ contentMd5 = pu.ContentMd5
if len(pu.FileName) < 256 {
n.Name = []byte(pu.FileName)
diff --git a/weed/util/bytes.go b/weed/util/bytes.go
index 0650919c0..5076c3e67 100644
--- a/weed/util/bytes.go
+++ b/weed/util/bytes.go
@@ -2,6 +2,7 @@ package util
import (
"crypto/md5"
+ "encoding/base64"
"fmt"
"io"
)
@@ -109,8 +110,20 @@ func HashToInt32(data []byte) (v int32) {
return
}
-func Md5(data []byte) string {
+func Base64Encode(data []byte) string {
+ return base64.StdEncoding.EncodeToString(data)
+}
+
+func Base64Md5(data []byte) string {
hash := md5.New()
hash.Write(data)
- return fmt.Sprintf("%x", hash.Sum(nil))
+ return Base64Encode(hash.Sum(nil))
+}
+
+func Base64Md5ToBytes(contentMd5 string) []byte {
+ data, err := base64.StdEncoding.DecodeString(contentMd5)
+ if err != nil {
+ return nil
+ }
+ return data
}