aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/operation/submit.go6
-rw-r--r--weed/operation/upload_content.go21
-rw-r--r--weed/server/common.go4
-rw-r--r--weed/server/filer_server_handlers_write.go8
-rw-r--r--weed/server/volume_server_handlers_read.go13
-rw-r--r--weed/storage/needle.go53
-rw-r--r--weed/storage/needle_read_write.go33
-rw-r--r--weed/storage/store.go1
-rw-r--r--weed/topology/store_replicate.go15
9 files changed, 121 insertions, 33 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 1de6b544a..75d5afbde 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -155,7 +155,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
cm.DeleteChunks(master)
}
} else {
- ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
+ ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt)
if e != nil {
return 0, e
}
@@ -180,7 +180,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
- "application/octet-stream", jwt)
+ "application/octet-stream", nil, jwt)
if uploadError != nil {
return fid, 0, uploadError
}
@@ -198,6 +198,6 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
- _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
+ _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
return e
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index a87784cad..30c7f1ea3 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -36,13 +36,13 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
-func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
+func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
return upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = io.Copy(w, reader)
return
- }, filename, isGzipped, mtype, jwt)
+ }, filename, isGzipped, mtype, pairMap, jwt)
}
-func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
+func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
h := make(textproto.MIMEHeader)
@@ -59,6 +59,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if jwt != "" {
h.Set("Authorization", "BEARER "+string(jwt))
}
+
file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil {
glog.V(0).Infoln("error creating form file", cp_err.Error())
@@ -73,7 +74,17 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
glog.V(0).Infoln("error closing body", err)
return nil, err
}
- resp, post_err := client.Post(uploadUrl, content_type, body_buf)
+
+ req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
+ if postErr != nil {
+ glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error())
+ return nil, postErr
+ }
+ req.Header.Set("Content-Type", content_type)
+ for k, v := range pairMap {
+ req.Header.Set(k, v)
+ }
+ resp, post_err := client.Do(req)
if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
return nil, post_err
@@ -86,7 +97,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
var ret UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body))
+ glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
return nil, unmarshal_err
}
if ret.Error != "" {
diff --git a/weed/server/common.go b/weed/server/common.go
index dcd31f823..c5956143f 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
+ fname, data, mimeType, pairMap, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
@@ -112,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("upload file to store", url)
- uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt)
+ uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, jwt)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 464cb81ef..80f222ce6 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -13,6 +13,8 @@ import (
"net/http"
"net/textproto"
"net/url"
+ "path"
+ "strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -20,8 +22,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
- "path"
- "strconv"
)
type FilerPostResult struct {
@@ -112,7 +112,7 @@ func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Re
if r.Method == "PUT" {
buf, _ := ioutil.ReadAll(r.Body)
r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
- fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
+ fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
if pe != nil {
glog.V(0).Infoln("failing to parse post body", pe.Error())
writeJsonError(w, r, http.StatusInternalServerError, pe)
@@ -521,7 +521,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
err = nil
ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
- uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId))
+ uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId))
if uploadResult != nil {
glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 6944e79e0..2a273c595 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -12,6 +12,8 @@ import (
"strings"
"time"
+ "encoding/json"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -94,6 +96,17 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
w.Header().Set("Etag", etag)
+ if n.HasPairs() {
+ pairMap := make(map[string]string)
+ err = json.Unmarshal(n.Pairs, &pairMap)
+ if err != nil {
+ glog.V(0).Infoln("Unmarshal pairs error:", err)
+ }
+ for k, v := range pairMap {
+ w.Header().Set(k, v)
+ }
+ }
+
if vs.tryHandleChunkedFile(n, filename, w, r) {
return
}
diff --git a/weed/storage/needle.go b/weed/storage/needle.go
index 1d306395e..5bafe2f62 100644
--- a/weed/storage/needle.go
+++ b/weed/storage/needle.go
@@ -1,6 +1,7 @@
package storage
import (
+ "encoding/json"
"fmt"
"io/ioutil"
"math"
@@ -22,6 +23,7 @@ const (
NeedleChecksumSize = 4
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
TombstoneFileSize = math.MaxUint32
+ PairNamePrefix = "Seaweed-"
)
/*
@@ -40,6 +42,8 @@ type Needle struct {
Name []byte `comment:"maximum 256 characters"` //version2
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
+ PairsSize uint16 //version2
+ Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
Ttl *TTL
@@ -55,8 +59,15 @@ func (n *Needle) String() (str string) {
}
func ParseUpload(r *http.Request) (
- fileName string, data []byte, mimeType string, isGzipped bool,
+ fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool,
modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
+ pairMap = make(map[string]string)
+ for k, v := range r.Header {
+ if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
+ pairMap[k] = v[0]
+ }
+ }
+
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@@ -109,19 +120,19 @@ func ParseUpload(r *http.Request) (
}
isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
- isGzipped = false
+ dotIndex := strings.LastIndex(fileName, ".")
+ ext, mtype := "", ""
+ if dotIndex > 0 {
+ ext = strings.ToLower(fileName[dotIndex:])
+ mtype = mime.TypeByExtension(ext)
+ }
+ contentType := part.Header.Get("Content-Type")
+ if contentType != "" && mtype != contentType {
+ mimeType = contentType //only return mime type if not deductable
+ mtype = contentType
+ }
+
if !isChunkedFile {
- dotIndex := strings.LastIndex(fileName, ".")
- ext, mtype := "", ""
- if dotIndex > 0 {
- ext = strings.ToLower(fileName[dotIndex:])
- mtype = mime.TypeByExtension(ext)
- }
- contentType := part.Header.Get("Content-Type")
- if contentType != "" && mtype != contentType {
- mimeType = contentType //only return mime type if not deductable
- mtype = contentType
- }
if part.Header.Get("Content-Encoding") == "gzip" {
isGzipped = true
} else if operation.IsGzippable(ext, mtype) {
@@ -144,9 +155,10 @@ func ParseUpload(r *http.Request) (
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
+ var pairMap map[string]string
fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
n = new(Needle)
- fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
+ fname, n.Data, mimeType, pairMap, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
if e != nil {
return
}
@@ -158,6 +170,19 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.Mime = []byte(mimeType)
n.SetHasMime()
}
+ if len(pairMap) != 0 {
+ trimmedPairMap := make(map[string]string)
+ for k, v := range pairMap {
+ trimmedPairMap[k[len(PairNamePrefix):]] = v
+ }
+
+ pairs, _ := json.Marshal(trimmedPairMap)
+ if len(pairs) < 65536 {
+ n.Pairs = pairs
+ n.PairsSize = uint16(len(pairs))
+ n.SetHasPairs()
+ }
+ }
if isGzipped {
n.SetGzipped()
}
diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go
index 8baa325df..ff43effb3 100644
--- a/weed/storage/needle_read_write.go
+++ b/weed/storage/needle_read_write.go
@@ -16,6 +16,7 @@ const (
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
FlagHasTtl = 0x10
+ FlagHasPairs = 0x20
FlagIsChunkManifest = 0x80
LastModifiedBytesLength = 5
TtlBytesLength = 2
@@ -78,6 +79,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
if n.HasTtl() {
n.Size = n.Size + TtlBytesLength
}
+ if n.HasPairs() {
+ n.Size += 2 + uint32(n.PairsSize)
+ }
} else {
n.Size = 0
}
@@ -128,6 +132,15 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
return
}
}
+ if n.HasPairs() {
+ util.Uint16toBytes(header[0:2], n.PairsSize)
+ if _, err = w.Write(header[0:2]); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Pairs); err != nil {
+ return
+ }
+ }
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@@ -141,8 +154,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
}
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
- padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
- readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
+ NeedleWithoutPaddingSize := NeedleHeaderSize + size + NeedleChecksumSize
+ padding := NeedlePaddingSize - (NeedleWithoutPaddingSize % NeedlePaddingSize)
+ readSize := NeedleWithoutPaddingSize + padding
return getBytesForFileBlock(r, offset, int(readSize))
}
@@ -213,6 +227,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
index = index + TtlBytesLength
}
+ if index < lenBytes && n.HasPairs() {
+ n.PairsSize = util.BytesToUint16(bytes[index : index+2])
+ index += 2
+ end := index + int(n.PairsSize)
+ n.Pairs = bytes[index:end]
+ index = end
+ }
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@@ -296,3 +317,11 @@ func (n *Needle) IsChunkedManifest() bool {
func (n *Needle) SetIsChunkManifest() {
n.Flags = n.Flags | FlagIsChunkManifest
}
+
+func (n *Needle) HasPairs() bool {
+ return n.Flags&FlagHasPairs != 0
+}
+
+func (n *Needle) SetHasPairs() {
+ n.Flags = n.Flags | FlagHasPairs
+}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 614c87ace..37a3904bd 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -303,6 +303,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
err = fmt.Errorf("Volume %d is read only", i)
return
}
+ // TODO: count needle size ahead
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
size, err = v.writeNeedle(n)
} else {
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index be5777167..aa312ac03 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -2,14 +2,14 @@ package topology
import (
"bytes"
+ "encoding/json"
"errors"
"fmt"
"net/http"
+ "net/url"
"strconv"
"strings"
- "net/url"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -55,9 +55,18 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
+
+ pairMap := make(map[string]string)
+ if needle.HasPairs() {
+ err := json.Unmarshal(needle.Pairs, &pairMap)
+ if err != nil {
+ glog.V(0).Infoln("Unmarshal pairs error:", err)
+ }
+ }
+
_, err := operation.Upload(u.String(),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
- jwt)
+ pairMap, jwt)
return err
}); err != nil {
ret = 0