aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go48
-rw-r--r--weed/operation/chunked_file.go213
-rw-r--r--weed/operation/compress.go59
-rw-r--r--weed/operation/data_struts.go7
-rw-r--r--weed/operation/delete_content.go117
-rw-r--r--weed/operation/list_masters.go32
-rw-r--r--weed/operation/lookup.go118
-rw-r--r--weed/operation/lookup_vid_cache.go51
-rw-r--r--weed/operation/lookup_vid_cache_test.go26
-rw-r--r--weed/operation/submit.go194
-rw-r--r--weed/operation/sync_volume.go54
-rw-r--r--weed/operation/system_message.pb.go203
-rw-r--r--weed/operation/system_message_test.go59
-rw-r--r--weed/operation/upload_content.go96
14 files changed, 1277 insertions, 0 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
new file mode 100644
index 000000000..acc2d3034
--- /dev/null
+++ b/weed/operation/assign_file_id.go
@@ -0,0 +1,48 @@
+package operation
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+ "strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type AssignResult struct {
+ Fid string `json:"fid,omitempty"`
+ Url string `json:"url,omitempty"`
+ PublicUrl string `json:"publicUrl,omitempty"`
+ Count uint64 `json:"count,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+func Assign(server string, count uint64, replication string, collection string, ttl string) (*AssignResult, error) {
+ values := make(url.Values)
+ values.Add("count", strconv.FormatUint(count, 10))
+ if replication != "" {
+ values.Add("replication", replication)
+ }
+ if collection != "" {
+ values.Add("collection", collection)
+ }
+ if ttl != "" {
+ values.Add("ttl", ttl)
+ }
+ jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
+ glog.V(2).Info("assign result :", string(jsonBlob))
+ if err != nil {
+ return nil, err
+ }
+ var ret AssignResult
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, fmt.Errorf("/dir/assign result JSON unmarshal error:%v, json:%s", err, string(jsonBlob))
+ }
+ if ret.Count <= 0 {
+ return nil, errors.New(ret.Error)
+ }
+ return &ret, nil
+}
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
new file mode 100644
index 000000000..52086514a
--- /dev/null
+++ b/weed/operation/chunked_file.go
@@ -0,0 +1,213 @@
+package operation
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "sort"
+
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ // when the remote server does not allow range requests (Accept-Ranges was not set)
+ ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
+ // ErrInvalidRange is returned by Read when trying to read past the end of the file
+ ErrInvalidRange = errors.New("Invalid range")
+)
+
+type ChunkInfo struct {
+ Fid string `json:"fid"`
+ Offset int64 `json:"offset"`
+ Size int64 `json:"size"`
+}
+
+type ChunkList []*ChunkInfo
+
+type ChunkManifest struct {
+ Name string `json:"name,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Size int64 `json:"size,omitempty"`
+ Chunks ChunkList `json:"chunks,omitempty"`
+}
+
+// seekable chunked file reader
+type ChunkedFileReader struct {
+ Manifest *ChunkManifest
+ Master string
+ pos int64
+ pr *io.PipeReader
+ pw *io.PipeWriter
+ mutex sync.Mutex
+}
+
+func (s ChunkList) Len() int { return len(s) }
+func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
+func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
+func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
+ if isGzipped {
+ var err error
+ if buffer, err = UnGzipData(buffer); err != nil {
+ return nil, err
+ }
+ }
+ cm := ChunkManifest{}
+ if e := json.Unmarshal(buffer, &cm); e != nil {
+ return nil, e
+ }
+ sort.Sort(cm.Chunks)
+ return &cm, nil
+}
+
+func (cm *ChunkManifest) Marshal() ([]byte, error) {
+ return json.Marshal(cm)
+}
+
+func (cm *ChunkManifest) DeleteChunks(master string) error {
+ deleteError := 0
+ for _, ci := range cm.Chunks {
+ if e := DeleteFile(master, ci.Fid, ""); e != nil {
+ deleteError++
+ glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master)
+ }
+ }
+ if deleteError > 0 {
+ return errors.New("Not all chunks deleted.")
+ }
+ return nil
+}
+
+func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) {
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return written, err
+ }
+ if offset > 0 {
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
+ }
+
+ resp, err := util.Do(req)
+ if err != nil {
+ return written, err
+ }
+ defer resp.Body.Close()
+
+ switch resp.StatusCode {
+ case http.StatusRequestedRangeNotSatisfiable:
+ return written, ErrInvalidRange
+ case http.StatusOK:
+ if offset > 0 {
+ return written, ErrRangeRequestsNotSupported
+ }
+ case http.StatusPartialContent:
+ break
+ default:
+ return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
+
+ }
+ return io.Copy(w, resp.Body)
+}
+
+func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
+ var err error
+ switch whence {
+ case 0:
+ case 1:
+ offset += cf.pos
+ case 2:
+ offset = cf.Manifest.Size - offset
+ }
+ if offset > cf.Manifest.Size {
+ err = ErrInvalidRange
+ }
+ if cf.pos != offset {
+ cf.Close()
+ }
+ cf.pos = offset
+ return cf.pos, err
+}
+
+func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
+ cm := cf.Manifest
+ chunkIndex := -1
+ chunkStartOffset := int64(0)
+ for i, ci := range cm.Chunks {
+ if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
+ chunkIndex = i
+ chunkStartOffset = cf.pos - ci.Offset
+ break
+ }
+ }
+ if chunkIndex < 0 {
+ return n, ErrInvalidRange
+ }
+ for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
+ ci := cm.Chunks[chunkIndex]
+ // if we need read date from local volume server first?
+ fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
+ if lookupError != nil {
+ return n, lookupError
+ }
+ if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil {
+ return n, e
+ } else {
+ n += wn
+ cf.pos += wn
+ }
+
+ chunkStartOffset = 0
+ }
+ return n, nil
+}
+
+func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
+ cf.Seek(off, 0)
+ return cf.Read(p)
+}
+
+func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
+ return cf.getPipeReader().Read(p)
+}
+
+func (cf *ChunkedFileReader) Close() (e error) {
+ cf.mutex.Lock()
+ defer cf.mutex.Unlock()
+ return cf.closePipe()
+}
+
+func (cf *ChunkedFileReader) closePipe() (e error) {
+ if cf.pr != nil {
+ if err := cf.pr.Close(); err != nil {
+ e = err
+ }
+ }
+ cf.pr = nil
+ if cf.pw != nil {
+ if err := cf.pw.Close(); err != nil {
+ e = err
+ }
+ }
+ cf.pw = nil
+ return e
+}
+
+func (cf *ChunkedFileReader) getPipeReader() io.Reader {
+ cf.mutex.Lock()
+ defer cf.mutex.Unlock()
+ if cf.pr != nil && cf.pw != nil {
+ return cf.pr
+ }
+ cf.closePipe()
+ cf.pr, cf.pw = io.Pipe()
+ go func(pw *io.PipeWriter) {
+ _, e := cf.WriteTo(pw)
+ pw.CloseWithError(e)
+ }(cf.pw)
+ return cf.pr
+}
diff --git a/weed/operation/compress.go b/weed/operation/compress.go
new file mode 100644
index 000000000..de62e5bf7
--- /dev/null
+++ b/weed/operation/compress.go
@@ -0,0 +1,59 @@
+package operation
+
+import (
+ "bytes"
+ "compress/flate"
+ "compress/gzip"
+ "io/ioutil"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+/*
+* Default more not to gzip since gzip can be done on client side.
+ */
+func IsGzippable(ext, mtype string) bool {
+ if strings.HasPrefix(mtype, "text/") {
+ return true
+ }
+ switch ext {
+ case ".zip", ".rar", ".gz", ".bz2", ".xz":
+ return false
+ case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json":
+ return true
+ }
+ if strings.HasPrefix(mtype, "application/") {
+ if strings.HasSuffix(mtype, "xml") {
+ return true
+ }
+ if strings.HasSuffix(mtype, "script") {
+ return true
+ }
+ }
+ return false
+}
+
+func GzipData(input []byte) ([]byte, error) {
+ buf := new(bytes.Buffer)
+ w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
+ if _, err := w.Write(input); err != nil {
+ glog.V(2).Infoln("error compressing data:", err)
+ return nil, err
+ }
+ if err := w.Close(); err != nil {
+ glog.V(2).Infoln("error closing compressed data:", err)
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+func UnGzipData(input []byte) ([]byte, error) {
+ buf := bytes.NewBuffer(input)
+ r, _ := gzip.NewReader(buf)
+ defer r.Close()
+ output, err := ioutil.ReadAll(r)
+ if err != nil {
+ glog.V(2).Infoln("error uncompressing data:", err)
+ }
+ return output, err
+}
diff --git a/weed/operation/data_struts.go b/weed/operation/data_struts.go
new file mode 100644
index 000000000..bfc53aa50
--- /dev/null
+++ b/weed/operation/data_struts.go
@@ -0,0 +1,7 @@
+package operation
+
+type JoinResult struct {
+ VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
+ SecretKey string `json:"secretKey,omitempty"`
+ Error string `json:"error,omitempty"`
+}
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
new file mode 100644
index 000000000..b78221da1
--- /dev/null
+++ b/weed/operation/delete_content.go
@@ -0,0 +1,117 @@
+package operation
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+ "strings"
+ "sync"
+
+ "net/http"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type DeleteResult struct {
+ Fid string `json:"fid"`
+ Size int `json:"size"`
+ Status int `json:"status"`
+ Error string `json:"error,omitempty"`
+}
+
+func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
+ fileUrl, err := LookupFileId(master, fileId)
+ if err != nil {
+ return fmt.Errorf("Failed to lookup %s:%v", fileId, err)
+ }
+ err = util.Delete(fileUrl, jwt)
+ if err != nil {
+ return fmt.Errorf("Failed to delete %s:%v", fileUrl, err)
+ }
+ return nil
+}
+
+func ParseFileId(fid string) (vid string, key_cookie string, err error) {
+ commaIndex := strings.Index(fid, ",")
+ if commaIndex <= 0 {
+ return "", "", errors.New("Wrong fid format.")
+ }
+ return fid[:commaIndex], fid[commaIndex+1:], nil
+}
+
+type DeleteFilesResult struct {
+ Errors []string
+ Results []DeleteResult
+}
+
+func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
+ vid_to_fileIds := make(map[string][]string)
+ ret := &DeleteFilesResult{}
+ var vids []string
+ for _, fileId := range fileIds {
+ vid, _, err := ParseFileId(fileId)
+ if err != nil {
+ ret.Results = append(ret.Results, DeleteResult{
+ Fid: vid,
+ Status: http.StatusBadRequest,
+ Error: err.Error()},
+ )
+ continue
+ }
+ if _, ok := vid_to_fileIds[vid]; !ok {
+ vid_to_fileIds[vid] = make([]string, 0)
+ vids = append(vids, vid)
+ }
+ vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId)
+ }
+
+ lookupResults, err := LookupVolumeIds(master, vids)
+ if err != nil {
+ return ret, err
+ }
+
+ server_to_fileIds := make(map[string][]string)
+ for vid, result := range lookupResults {
+ if result.Error != "" {
+ ret.Errors = append(ret.Errors, result.Error)
+ continue
+ }
+ for _, location := range result.Locations {
+ if _, ok := server_to_fileIds[location.Url]; !ok {
+ server_to_fileIds[location.Url] = make([]string, 0)
+ }
+ server_to_fileIds[location.Url] = append(
+ server_to_fileIds[location.Url], vid_to_fileIds[vid]...)
+ }
+ }
+
+ var wg sync.WaitGroup
+
+ for server, fidList := range server_to_fileIds {
+ wg.Add(1)
+ go func(server string, fidList []string) {
+ defer wg.Done()
+ values := make(url.Values)
+ for _, fid := range fidList {
+ values.Add("fid", fid)
+ }
+ jsonBlob, err := util.Post("http://"+server+"/delete", values)
+ if err != nil {
+ ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
+ return
+ }
+ var result []DeleteResult
+ err = json.Unmarshal(jsonBlob, &result)
+ if err != nil {
+ ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
+ return
+ }
+ ret.Results = append(ret.Results, result...)
+ }(server, fidList)
+ }
+ wg.Wait()
+
+ return ret, nil
+}
diff --git a/weed/operation/list_masters.go b/weed/operation/list_masters.go
new file mode 100644
index 000000000..0a15b0af8
--- /dev/null
+++ b/weed/operation/list_masters.go
@@ -0,0 +1,32 @@
+package operation
+
+import (
+ "encoding/json"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type ClusterStatusResult struct {
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader string `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
+}
+
+func ListMasters(server string) ([]string, error) {
+ jsonBlob, err := util.Get("http://" + server + "/cluster/status")
+ glog.V(2).Info("list masters result :", string(jsonBlob))
+ if err != nil {
+ return nil, err
+ }
+ var ret ClusterStatusResult
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, err
+ }
+ masters := ret.Peers
+ if ret.IsLeader {
+ masters = append(masters, ret.Leader)
+ }
+ return masters, nil
+}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
new file mode 100644
index 000000000..19d9dbb94
--- /dev/null
+++ b/weed/operation/lookup.go
@@ -0,0 +1,118 @@
+package operation
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/rand"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type Location struct {
+ Url string `json:"url,omitempty"`
+ PublicUrl string `json:"publicUrl,omitempty"`
+}
+type LookupResult struct {
+ VolumeId string `json:"volumeId,omitempty"`
+ Locations []Location `json:"locations,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+func (lr *LookupResult) String() string {
+ return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error)
+}
+
+var (
+ vc VidCache // caching of volume locations, re-check if after 10 minutes
+)
+
+func Lookup(server string, vid string) (ret *LookupResult, err error) {
+ locations, cache_err := vc.Get(vid)
+ if cache_err != nil {
+ if ret, err = do_lookup(server, vid); err == nil {
+ vc.Set(vid, ret.Locations, 10*time.Minute)
+ }
+ } else {
+ ret = &LookupResult{VolumeId: vid, Locations: locations}
+ }
+ return
+}
+
+func do_lookup(server string, vid string) (*LookupResult, error) {
+ values := make(url.Values)
+ values.Add("volumeId", vid)
+ jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
+ if err != nil {
+ return nil, err
+ }
+ var ret LookupResult
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, err
+ }
+ if ret.Error != "" {
+ return nil, errors.New(ret.Error)
+ }
+ return &ret, nil
+}
+
+func LookupFileId(server string, fileId string) (fullUrl string, err error) {
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return "", errors.New("Invalid fileId " + fileId)
+ }
+ lookup, lookupError := Lookup(server, parts[0])
+ if lookupError != nil {
+ return "", lookupError
+ }
+ if len(lookup.Locations) == 0 {
+ return "", errors.New("File Not Found")
+ }
+ return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil
+}
+
+// LookupVolumeIds find volume locations by cache and actual lookup
+func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
+ ret := make(map[string]LookupResult)
+ var unknown_vids []string
+
+ //check vid cache first
+ for _, vid := range vids {
+ locations, cache_err := vc.Get(vid)
+ if cache_err == nil {
+ ret[vid] = LookupResult{VolumeId: vid, Locations: locations}
+ } else {
+ unknown_vids = append(unknown_vids, vid)
+ }
+ }
+ //return success if all volume ids are known
+ if len(unknown_vids) == 0 {
+ return ret, nil
+ }
+
+ //only query unknown_vids
+ values := make(url.Values)
+ for _, vid := range unknown_vids {
+ values.Add("volumeId", vid)
+ }
+ jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values)
+ if err != nil {
+ return nil, err
+ }
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, errors.New(err.Error() + " " + string(jsonBlob))
+ }
+
+ //set newly checked vids to cache
+ for _, vid := range unknown_vids {
+ locations := ret[vid].Locations
+ vc.Set(vid, locations, 10*time.Minute)
+ }
+
+ return ret, nil
+}
diff --git a/weed/operation/lookup_vid_cache.go b/weed/operation/lookup_vid_cache.go
new file mode 100644
index 000000000..1ed03613d
--- /dev/null
+++ b/weed/operation/lookup_vid_cache.go
@@ -0,0 +1,51 @@
+package operation
+
+import (
+ "errors"
+ "strconv"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type VidInfo struct {
+ Locations []Location
+ NextRefreshTime time.Time
+}
+type VidCache struct {
+ cache []VidInfo
+}
+
+func (vc *VidCache) Get(vid string) ([]Location, error) {
+ id, err := strconv.Atoi(vid)
+ if err != nil {
+ glog.V(1).Infof("Unknown volume id %s", vid)
+ return nil, err
+ }
+ if 0 < id && id <= len(vc.cache) {
+ if vc.cache[id-1].Locations == nil {
+ return nil, errors.New("Not Set")
+ }
+ if vc.cache[id-1].NextRefreshTime.Before(time.Now()) {
+ return nil, errors.New("Expired")
+ }
+ return vc.cache[id-1].Locations, nil
+ }
+ return nil, errors.New("Not Found")
+}
+func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) {
+ id, err := strconv.Atoi(vid)
+ if err != nil {
+ glog.V(1).Infof("Unknown volume id %s", vid)
+ return
+ }
+ if id > len(vc.cache) {
+ for i := id - len(vc.cache); i > 0; i-- {
+ vc.cache = append(vc.cache, VidInfo{})
+ }
+ }
+ if id > 0 {
+ vc.cache[id-1].Locations = locations
+ vc.cache[id-1].NextRefreshTime = time.Now().Add(duration)
+ }
+}
diff --git a/weed/operation/lookup_vid_cache_test.go b/weed/operation/lookup_vid_cache_test.go
new file mode 100644
index 000000000..9c9e2affb
--- /dev/null
+++ b/weed/operation/lookup_vid_cache_test.go
@@ -0,0 +1,26 @@
+package operation
+
+import (
+ "fmt"
+ "testing"
+ "time"
+)
+
+func TestCaching(t *testing.T) {
+ var (
+ vc VidCache
+ )
+ var locations []Location
+ locations = append(locations, Location{Url: "a.com:8080"})
+ vc.Set("123", locations, time.Second)
+ ret, _ := vc.Get("123")
+ if ret == nil {
+ t.Fatal("Not found vid 123")
+ }
+ fmt.Printf("vid 123 locations = %v\n", ret)
+ time.Sleep(2 * time.Second)
+ ret, _ = vc.Get("123")
+ if ret != nil {
+ t.Fatal("Not found vid 123")
+ }
+}
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
new file mode 100644
index 000000000..19bbd7a70
--- /dev/null
+++ b/weed/operation/submit.go
@@ -0,0 +1,194 @@
+package operation
+
+import (
+ "bytes"
+ "io"
+ "mime"
+ "net/url"
+ "os"
+ "path"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type FilePart struct {
+ Reader io.Reader
+ FileName string
+ FileSize int64
+ IsGzipped bool
+ MimeType string
+ ModTime int64 //in seconds
+ Replication string
+ Collection string
+ Ttl string
+ Server string //this comes from assign result
+ Fid string //this comes from assign result, but customizable
+}
+
+type SubmitResult struct {
+ FileName string `json:"fileName,omitempty"`
+ FileUrl string `json:"fileUrl,omitempty"`
+ Fid string `json:"fid,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+func SubmitFiles(master string, files []FilePart,
+ replication string, collection string, ttl string, maxMB int,
+ secret security.Secret,
+) ([]SubmitResult, error) {
+ results := make([]SubmitResult, len(files))
+ for index, file := range files {
+ results[index].FileName = file.FileName
+ }
+ ret, err := Assign(master, uint64(len(files)), replication, collection, ttl)
+ if err != nil {
+ for index, _ := range files {
+ results[index].Error = err.Error()
+ }
+ return results, err
+ }
+ for index, file := range files {
+ file.Fid = ret.Fid
+ if index > 0 {
+ file.Fid = file.Fid + "_" + strconv.Itoa(index)
+ }
+ file.Server = ret.Url
+ file.Replication = replication
+ file.Collection = collection
+ results[index].Size, err = file.Upload(maxMB, master, secret)
+ if err != nil {
+ results[index].Error = err.Error()
+ }
+ results[index].Fid = file.Fid
+ results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
+ }
+ return results, nil
+}
+
+func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
+ ret = make([]FilePart, len(fullPathFilenames))
+ for index, file := range fullPathFilenames {
+ if ret[index], err = newFilePart(file); err != nil {
+ return
+ }
+ }
+ return
+}
+func newFilePart(fullPathFilename string) (ret FilePart, err error) {
+ fh, openErr := os.Open(fullPathFilename)
+ if openErr != nil {
+ glog.V(0).Info("Failed to open file: ", fullPathFilename)
+ return ret, openErr
+ }
+ ret.Reader = fh
+
+ if fi, fiErr := fh.Stat(); fiErr != nil {
+ glog.V(0).Info("Failed to stat file:", fullPathFilename)
+ return ret, fiErr
+ } else {
+ ret.ModTime = fi.ModTime().UTC().Unix()
+ ret.FileSize = fi.Size()
+ }
+ ext := strings.ToLower(path.Ext(fullPathFilename))
+ ret.IsGzipped = ext == ".gz"
+ if ret.IsGzipped {
+ ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
+ }
+ ret.FileName = fullPathFilename
+ if ext != "" {
+ ret.MimeType = mime.TypeByExtension(ext)
+ }
+
+ return ret, nil
+}
+
+func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
+ jwt := security.GenJwt(secret, fi.Fid)
+ fileUrl := "http://" + fi.Server + "/" + fi.Fid
+ if fi.ModTime != 0 {
+ fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
+ }
+ if closer, ok := fi.Reader.(io.Closer); ok {
+ defer closer.Close()
+ }
+ baseName := path.Base(fi.FileName)
+ if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
+ chunkSize := int64(maxMB * 1024 * 1024)
+ chunks := fi.FileSize/chunkSize + 1
+ cm := ChunkManifest{
+ Name: baseName,
+ Size: fi.FileSize,
+ Mime: fi.MimeType,
+ Chunks: make([]*ChunkInfo, 0, chunks),
+ }
+
+ for i := int64(0); i < chunks; i++ {
+ id, count, e := upload_one_chunk(
+ baseName+"-"+strconv.FormatInt(i+1, 10),
+ io.LimitReader(fi.Reader, chunkSize),
+ master, fi.Replication, fi.Collection, fi.Ttl,
+ jwt)
+ if e != nil {
+ // delete all uploaded chunks
+ cm.DeleteChunks(master)
+ return 0, e
+ }
+ cm.Chunks = append(cm.Chunks,
+ &ChunkInfo{
+ Offset: i * chunkSize,
+ Size: int64(count),
+ Fid: id,
+ },
+ )
+ retSize += count
+ }
+ err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
+ if err != nil {
+ // delete all uploaded chunks
+ cm.DeleteChunks(master)
+ }
+ } else {
+ ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
+ if e != nil {
+ return 0, e
+ }
+ return ret.Size, e
+ }
+ return
+}
+
+func upload_one_chunk(filename string, reader io.Reader, master,
+ replication string, collection string, ttl string, jwt security.EncodedJwt,
+) (fid string, size uint32, e error) {
+ ret, err := Assign(master, 1, replication, collection, ttl)
+ if err != nil {
+ return "", 0, err
+ }
+ fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
+ glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
+ uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
+ "application/octet-stream", jwt)
+ if uploadError != nil {
+ return fid, 0, uploadError
+ }
+ return fid, uploadResult.Size, nil
+}
+
+func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
+ buf, e := manifest.Marshal()
+ if e != nil {
+ return e
+ }
+ bufReader := bytes.NewReader(buf)
+ glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
+ u, _ := url.Parse(fileUrl)
+ q := u.Query()
+ q.Set("cm", "true")
+ u.RawQuery = q.Encode()
+ _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
+ return e
+}
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
new file mode 100644
index 000000000..b7a727fc7
--- /dev/null
+++ b/weed/operation/sync_volume.go
@@ -0,0 +1,54 @@
+package operation
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/url"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type SyncVolumeResponse struct {
+ Replication string `json:"Replication,omitempty"`
+ Ttl string `json:"Ttl,omitempty"`
+ TailOffset uint64 `json:"TailOffset,omitempty"`
+ CompactRevision uint16 `json:"CompactRevision,omitempty"`
+ IdxFileSize uint64 `json:"IdxFileSize,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) {
+ values := make(url.Values)
+ values.Add("volume", vid)
+ jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values)
+ glog.V(2).Info("sync volume result :", string(jsonBlob))
+ if err != nil {
+ return nil, err
+ }
+ var ret SyncVolumeResponse
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, err
+ }
+ if ret.Error != "" {
+ return nil, fmt.Errorf("Volume %s get sync status error: %s", vid, ret.Error)
+ }
+ return &ret, nil
+}
+
+func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error {
+ values := make(url.Values)
+ values.Add("volume", vid)
+ line := make([]byte, 16)
+ err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) {
+ key := util.BytesToUint64(bytes[:8])
+ offset := util.BytesToUint32(bytes[8:12])
+ size := util.BytesToUint32(bytes[12:16])
+ eachEntryFn(key, offset, size)
+ })
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/weed/operation/system_message.pb.go b/weed/operation/system_message.pb.go
new file mode 100644
index 000000000..742a1ca4e
--- /dev/null
+++ b/weed/operation/system_message.pb.go
@@ -0,0 +1,203 @@
+// Code generated by protoc-gen-go.
+// source: system_message.proto
+// DO NOT EDIT!
+
+/*
+Package operation is a generated protocol buffer package.
+
+It is generated from these files:
+ system_message.proto
+
+It has these top-level messages:
+ VolumeInformationMessage
+ JoinMessage
+*/
+package operation
+
+import proto "github.com/golang/protobuf/proto"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = math.Inf
+
+type VolumeInformationMessage struct {
+ Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"`
+ Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"`
+ Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
+ FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"`
+ DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
+ DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
+ ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
+ ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
+ Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
+ Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
+func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
+func (*VolumeInformationMessage) ProtoMessage() {}
+
+const Default_VolumeInformationMessage_Version uint32 = 2
+
+func (m *VolumeInformationMessage) GetId() uint32 {
+ if m != nil && m.Id != nil {
+ return *m.Id
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetSize() uint64 {
+ if m != nil && m.Size != nil {
+ return *m.Size
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetCollection() string {
+ if m != nil && m.Collection != nil {
+ return *m.Collection
+ }
+ return ""
+}
+
+func (m *VolumeInformationMessage) GetFileCount() uint64 {
+ if m != nil && m.FileCount != nil {
+ return *m.FileCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
+ if m != nil && m.DeleteCount != nil {
+ return *m.DeleteCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
+ if m != nil && m.DeletedByteCount != nil {
+ return *m.DeletedByteCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetReadOnly() bool {
+ if m != nil && m.ReadOnly != nil {
+ return *m.ReadOnly
+ }
+ return false
+}
+
+func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
+ if m != nil && m.ReplicaPlacement != nil {
+ return *m.ReplicaPlacement
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetVersion() uint32 {
+ if m != nil && m.Version != nil {
+ return *m.Version
+ }
+ return Default_VolumeInformationMessage_Version
+}
+
+func (m *VolumeInformationMessage) GetTtl() uint32 {
+ if m != nil && m.Ttl != nil {
+ return *m.Ttl
+ }
+ return 0
+}
+
+type JoinMessage struct {
+ IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
+ Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
+ Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"`
+ PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"`
+ MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"`
+ MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"`
+ DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
+ Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
+ Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
+ AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *JoinMessage) Reset() { *m = JoinMessage{} }
+func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
+func (*JoinMessage) ProtoMessage() {}
+
+func (m *JoinMessage) GetIsInit() bool {
+ if m != nil && m.IsInit != nil {
+ return *m.IsInit
+ }
+ return false
+}
+
+func (m *JoinMessage) GetIp() string {
+ if m != nil && m.Ip != nil {
+ return *m.Ip
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetPort() uint32 {
+ if m != nil && m.Port != nil {
+ return *m.Port
+ }
+ return 0
+}
+
+func (m *JoinMessage) GetPublicUrl() string {
+ if m != nil && m.PublicUrl != nil {
+ return *m.PublicUrl
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetMaxVolumeCount() uint32 {
+ if m != nil && m.MaxVolumeCount != nil {
+ return *m.MaxVolumeCount
+ }
+ return 0
+}
+
+func (m *JoinMessage) GetMaxFileKey() uint64 {
+ if m != nil && m.MaxFileKey != nil {
+ return *m.MaxFileKey
+ }
+ return 0
+}
+
+func (m *JoinMessage) GetDataCenter() string {
+ if m != nil && m.DataCenter != nil {
+ return *m.DataCenter
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetRack() string {
+ if m != nil && m.Rack != nil {
+ return *m.Rack
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
+ if m != nil {
+ return m.Volumes
+ }
+ return nil
+}
+
+func (m *JoinMessage) GetAdminPort() uint32 {
+ if m != nil && m.AdminPort != nil {
+ return *m.AdminPort
+ }
+ return 0
+}
+
+func init() {
+}
diff --git a/weed/operation/system_message_test.go b/weed/operation/system_message_test.go
new file mode 100644
index 000000000..d18ca49a4
--- /dev/null
+++ b/weed/operation/system_message_test.go
@@ -0,0 +1,59 @@
+package operation
+
+import (
+ "encoding/json"
+ "log"
+ "testing"
+
+ "github.com/golang/protobuf/proto"
+)
+
+func TestSerialDeserial(t *testing.T) {
+ volumeMessage := &VolumeInformationMessage{
+ Id: proto.Uint32(12),
+ Size: proto.Uint64(2341234),
+ Collection: proto.String("benchmark"),
+ FileCount: proto.Uint64(2341234),
+ DeleteCount: proto.Uint64(234),
+ DeletedByteCount: proto.Uint64(21234),
+ ReadOnly: proto.Bool(false),
+ ReplicaPlacement: proto.Uint32(210),
+ Version: proto.Uint32(2),
+ }
+ var volumeMessages []*VolumeInformationMessage
+ volumeMessages = append(volumeMessages, volumeMessage)
+
+ joinMessage := &JoinMessage{
+ IsInit: proto.Bool(true),
+ Ip: proto.String("127.0.3.12"),
+ Port: proto.Uint32(34546),
+ PublicUrl: proto.String("localhost:2342"),
+ MaxVolumeCount: proto.Uint32(210),
+ MaxFileKey: proto.Uint64(324234423),
+ DataCenter: proto.String("dc1"),
+ Rack: proto.String("rack2"),
+ Volumes: volumeMessages,
+ }
+
+ data, err := proto.Marshal(joinMessage)
+ if err != nil {
+ log.Fatal("marshaling error: ", err)
+ }
+ newMessage := &JoinMessage{}
+ err = proto.Unmarshal(data, newMessage)
+ if err != nil {
+ log.Fatal("unmarshaling error: ", err)
+ }
+ log.Println("The pb data size is", len(data))
+
+ jsonData, jsonError := json.Marshal(joinMessage)
+ if jsonError != nil {
+ log.Fatal("json marshaling error: ", jsonError)
+ }
+ log.Println("The json data size is", len(jsonData), string(jsonData))
+
+ // Now test and newTest contain the same data.
+ if *joinMessage.PublicUrl != *newMessage.PublicUrl {
+ log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl)
+ }
+}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
new file mode 100644
index 000000000..a87784cad
--- /dev/null
+++ b/weed/operation/upload_content.go
@@ -0,0 +1,96 @@
+package operation
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "mime"
+ "mime/multipart"
+ "net/http"
+ "net/textproto"
+ "path/filepath"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type UploadResult struct {
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+var (
+ client *http.Client
+)
+
+func init() {
+ client = &http.Client{Transport: &http.Transport{
+ MaxIdleConnsPerHost: 1024,
+ }}
+}
+
+var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
+
+func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
+ return upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = io.Copy(w, reader)
+ return
+ }, filename, isGzipped, mtype, jwt)
+}
+func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
+ body_buf := bytes.NewBufferString("")
+ body_writer := multipart.NewWriter(body_buf)
+ h := make(textproto.MIMEHeader)
+ h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
+ if mtype == "" {
+ mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename)))
+ }
+ if mtype != "" {
+ h.Set("Content-Type", mtype)
+ }
+ if isGzipped {
+ h.Set("Content-Encoding", "gzip")
+ }
+ if jwt != "" {
+ h.Set("Authorization", "BEARER "+string(jwt))
+ }
+ file_writer, cp_err := body_writer.CreatePart(h)
+ if cp_err != nil {
+ glog.V(0).Infoln("error creating form file", cp_err.Error())
+ return nil, cp_err
+ }
+ if err := fillBufferFunction(file_writer); err != nil {
+ glog.V(0).Infoln("error copying data", err)
+ return nil, err
+ }
+ content_type := body_writer.FormDataContentType()
+ if err := body_writer.Close(); err != nil {
+ glog.V(0).Infoln("error closing body", err)
+ return nil, err
+ }
+ resp, post_err := client.Post(uploadUrl, content_type, body_buf)
+ if post_err != nil {
+ glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
+ return nil, post_err
+ }
+ defer resp.Body.Close()
+ resp_body, ra_err := ioutil.ReadAll(resp.Body)
+ if ra_err != nil {
+ return nil, ra_err
+ }
+ var ret UploadResult
+ unmarshal_err := json.Unmarshal(resp_body, &ret)
+ if unmarshal_err != nil {
+ glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body))
+ return nil, unmarshal_err
+ }
+ if ret.Error != "" {
+ return nil, errors.New(ret.Error)
+ }
+ return &ret, nil
+}