diff options
| author | Chris Lu <chris.lu@gmail.com> | 2016-06-02 18:09:14 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2016-06-02 18:09:14 -0700 |
| commit | 5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44 (patch) | |
| tree | 2e4dd2ad0a618ab2b7cdebcdb9c503526c31e2e8 /go/operation | |
| parent | caeffa3998adc060fa66c4cd77af971ff2d26c57 (diff) | |
| download | seaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.tar.xz seaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.zip | |
directory structure change to work with glide
glide has its own requirements. My previous workaround caused me some
code checkin errors. Need to fix this.
Diffstat (limited to 'go/operation')
| -rw-r--r-- | go/operation/assign_file_id.go | 48 | ||||
| -rw-r--r-- | go/operation/chunked_file.go | 213 | ||||
| -rw-r--r-- | go/operation/compress.go | 59 | ||||
| -rw-r--r-- | go/operation/data_struts.go | 7 | ||||
| -rw-r--r-- | go/operation/delete_content.go | 117 | ||||
| -rw-r--r-- | go/operation/list_masters.go | 32 | ||||
| -rw-r--r-- | go/operation/lookup.go | 118 | ||||
| -rw-r--r-- | go/operation/lookup_vid_cache.go | 51 | ||||
| -rw-r--r-- | go/operation/lookup_vid_cache_test.go | 26 | ||||
| -rw-r--r-- | go/operation/submit.go | 194 | ||||
| -rw-r--r-- | go/operation/sync_volume.go | 54 | ||||
| -rw-r--r-- | go/operation/system_message.pb.go | 203 | ||||
| -rw-r--r-- | go/operation/system_message_test.go | 59 | ||||
| -rw-r--r-- | go/operation/upload_content.go | 96 |
14 files changed, 0 insertions, 1277 deletions
diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go deleted file mode 100644 index fa436b651..000000000 --- a/go/operation/assign_file_id.go +++ /dev/null @@ -1,48 +0,0 @@ -package operation - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - "strconv" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -type AssignResult struct { - Fid string `json:"fid,omitempty"` - Url string `json:"url,omitempty"` - PublicUrl string `json:"publicUrl,omitempty"` - Count uint64 `json:"count,omitempty"` - Error string `json:"error,omitempty"` -} - -func Assign(server string, count uint64, replication string, collection string, ttl string) (*AssignResult, error) { - values := make(url.Values) - values.Add("count", strconv.FormatUint(count, 10)) - if replication != "" { - values.Add("replication", replication) - } - if collection != "" { - values.Add("collection", collection) - } - if ttl != "" { - values.Add("ttl", ttl) - } - jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) - glog.V(2).Info("assign result :", string(jsonBlob)) - if err != nil { - return nil, err - } - var ret AssignResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, fmt.Errorf("/dir/assign result JSON unmarshal error:%v, json:%s", err, string(jsonBlob)) - } - if ret.Count <= 0 { - return nil, errors.New(ret.Error) - } - return &ret, nil -} diff --git a/go/operation/chunked_file.go b/go/operation/chunked_file.go deleted file mode 100644 index 786b8a989..000000000 --- a/go/operation/chunked_file.go +++ /dev/null @@ -1,213 +0,0 @@ -package operation - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "sort" - - "sync" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -var ( - // when the remote server does not allow range requests (Accept-Ranges was not set) - ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server") - // ErrInvalidRange is returned by Read when trying to read past the end of the file - ErrInvalidRange = errors.New("Invalid range") -) - -type ChunkInfo struct { - Fid string `json:"fid"` - Offset int64 `json:"offset"` - Size int64 `json:"size"` -} - -type ChunkList []*ChunkInfo - -type ChunkManifest struct { - Name string `json:"name,omitempty"` - Mime string `json:"mime,omitempty"` - Size int64 `json:"size,omitempty"` - Chunks ChunkList `json:"chunks,omitempty"` -} - -// seekable chunked file reader -type ChunkedFileReader struct { - Manifest *ChunkManifest - Master string - pos int64 - pr *io.PipeReader - pw *io.PipeWriter - mutex sync.Mutex -} - -func (s ChunkList) Len() int { return len(s) } -func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } -func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { - if isGzipped { - var err error - if buffer, err = UnGzipData(buffer); err != nil { - return nil, err - } - } - cm := ChunkManifest{} - if e := json.Unmarshal(buffer, &cm); e != nil { - return nil, e - } - sort.Sort(cm.Chunks) - return &cm, nil -} - -func (cm *ChunkManifest) Marshal() ([]byte, error) { - return json.Marshal(cm) -} - -func (cm *ChunkManifest) DeleteChunks(master string) error { - deleteError := 0 - for _, ci := range cm.Chunks { - if e := DeleteFile(master, ci.Fid, ""); e != nil { - deleteError++ - glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master) - } - } - if deleteError > 0 { - return errors.New("Not all chunks deleted.") - } - return nil -} - -func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) { - req, err := http.NewRequest("GET", fileUrl, nil) - if err != nil { - return written, err - } - if offset > 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) - } - - resp, err := util.Do(req) - if err != nil { - return written, err - } - defer resp.Body.Close() - - switch resp.StatusCode { - case http.StatusRequestedRangeNotSatisfiable: - return written, ErrInvalidRange - case http.StatusOK: - if offset > 0 { - return written, ErrRangeRequestsNotSupported - } - case http.StatusPartialContent: - break - default: - return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl) - - } - return io.Copy(w, resp.Body) -} - -func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { - var err error - switch whence { - case 0: - case 1: - offset += cf.pos - case 2: - offset = cf.Manifest.Size - offset - } - if offset > cf.Manifest.Size { - err = ErrInvalidRange - } - if cf.pos != offset { - cf.Close() - } - cf.pos = offset - return cf.pos, err -} - -func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { - cm := cf.Manifest - chunkIndex := -1 - chunkStartOffset := int64(0) - for i, ci := range cm.Chunks { - if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size { - chunkIndex = i - chunkStartOffset = cf.pos - ci.Offset - break - } - } - if chunkIndex < 0 { - return n, ErrInvalidRange - } - for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { - ci := cm.Chunks[chunkIndex] - // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid) - if lookupError != nil { - return n, lookupError - } - if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil { - return n, e - } else { - n += wn - cf.pos += wn - } - - chunkStartOffset = 0 - } - return n, nil -} - -func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) { - cf.Seek(off, 0) - return cf.Read(p) -} - -func (cf *ChunkedFileReader) Read(p []byte) (int, error) { - return cf.getPipeReader().Read(p) -} - -func (cf *ChunkedFileReader) Close() (e error) { - cf.mutex.Lock() - defer cf.mutex.Unlock() - return cf.closePipe() -} - -func (cf *ChunkedFileReader) closePipe() (e error) { - if cf.pr != nil { - if err := cf.pr.Close(); err != nil { - e = err - } - } - cf.pr = nil - if cf.pw != nil { - if err := cf.pw.Close(); err != nil { - e = err - } - } - cf.pw = nil - return e -} - -func (cf *ChunkedFileReader) getPipeReader() io.Reader { - cf.mutex.Lock() - defer cf.mutex.Unlock() - if cf.pr != nil && cf.pw != nil { - return cf.pr - } - cf.closePipe() - cf.pr, cf.pw = io.Pipe() - go func(pw *io.PipeWriter) { - _, e := cf.WriteTo(pw) - pw.CloseWithError(e) - }(cf.pw) - return cf.pr -} diff --git a/go/operation/compress.go b/go/operation/compress.go deleted file mode 100644 index b1105ba4b..000000000 --- a/go/operation/compress.go +++ /dev/null @@ -1,59 +0,0 @@ -package operation - -import ( - "bytes" - "compress/flate" - "compress/gzip" - "io/ioutil" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -/* -* Default more not to gzip since gzip can be done on client side. - */ -func IsGzippable(ext, mtype string) bool { - if strings.HasPrefix(mtype, "text/") { - return true - } - switch ext { - case ".zip", ".rar", ".gz", ".bz2", ".xz": - return false - case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json": - return true - } - if strings.HasPrefix(mtype, "application/") { - if strings.HasSuffix(mtype, "xml") { - return true - } - if strings.HasSuffix(mtype, "script") { - return true - } - } - return false -} - -func GzipData(input []byte) ([]byte, error) { - buf := new(bytes.Buffer) - w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) - if _, err := w.Write(input); err != nil { - glog.V(2).Infoln("error compressing data:", err) - return nil, err - } - if err := w.Close(); err != nil { - glog.V(2).Infoln("error closing compressed data:", err) - return nil, err - } - return buf.Bytes(), nil -} -func UnGzipData(input []byte) ([]byte, error) { - buf := bytes.NewBuffer(input) - r, _ := gzip.NewReader(buf) - defer r.Close() - output, err := ioutil.ReadAll(r) - if err != nil { - glog.V(2).Infoln("error uncompressing data:", err) - } - return output, err -} diff --git a/go/operation/data_struts.go b/go/operation/data_struts.go deleted file mode 100644 index bfc53aa50..000000000 --- a/go/operation/data_struts.go +++ /dev/null @@ -1,7 +0,0 @@ -package operation - -type JoinResult struct { - VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` - SecretKey string `json:"secretKey,omitempty"` - Error string `json:"error,omitempty"` -} diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go deleted file mode 100644 index c808cc75a..000000000 --- a/go/operation/delete_content.go +++ /dev/null @@ -1,117 +0,0 @@ -package operation - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - "strings" - "sync" - - "net/http" - - "github.com/chrislusf/seaweedfs/go/security" - "github.com/chrislusf/seaweedfs/go/util" -) - -type DeleteResult struct { - Fid string `json:"fid"` - Size int `json:"size"` - Status int `json:"status"` - Error string `json:"error,omitempty"` -} - -func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { - fileUrl, err := LookupFileId(master, fileId) - if err != nil { - return fmt.Errorf("Failed to lookup %s:%v", fileId, err) - } - err = util.Delete(fileUrl, jwt) - if err != nil { - return fmt.Errorf("Failed to delete %s:%v", fileUrl, err) - } - return nil -} - -func ParseFileId(fid string) (vid string, key_cookie string, err error) { - commaIndex := strings.Index(fid, ",") - if commaIndex <= 0 { - return "", "", errors.New("Wrong fid format.") - } - return fid[:commaIndex], fid[commaIndex+1:], nil -} - -type DeleteFilesResult struct { - Errors []string - Results []DeleteResult -} - -func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { - vid_to_fileIds := make(map[string][]string) - ret := &DeleteFilesResult{} - var vids []string - for _, fileId := range fileIds { - vid, _, err := ParseFileId(fileId) - if err != nil { - ret.Results = append(ret.Results, DeleteResult{ - Fid: vid, - Status: http.StatusBadRequest, - Error: err.Error()}, - ) - continue - } - if _, ok := vid_to_fileIds[vid]; !ok { - vid_to_fileIds[vid] = make([]string, 0) - vids = append(vids, vid) - } - vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId) - } - - lookupResults, err := LookupVolumeIds(master, vids) - if err != nil { - return ret, err - } - - server_to_fileIds := make(map[string][]string) - for vid, result := range lookupResults { - if result.Error != "" { - ret.Errors = append(ret.Errors, result.Error) - continue - } - for _, location := range result.Locations { - if _, ok := server_to_fileIds[location.Url]; !ok { - server_to_fileIds[location.Url] = make([]string, 0) - } - server_to_fileIds[location.Url] = append( - server_to_fileIds[location.Url], vid_to_fileIds[vid]...) - } - } - - var wg sync.WaitGroup - - for server, fidList := range server_to_fileIds { - wg.Add(1) - go func(server string, fidList []string) { - defer wg.Done() - values := make(url.Values) - for _, fid := range fidList { - values.Add("fid", fid) - } - jsonBlob, err := util.Post("http://"+server+"/delete", values) - if err != nil { - ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) - return - } - var result []DeleteResult - err = json.Unmarshal(jsonBlob, &result) - if err != nil { - ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) - return - } - ret.Results = append(ret.Results, result...) - }(server, fidList) - } - wg.Wait() - - return ret, nil -} diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go deleted file mode 100644 index bda6f3c65..000000000 --- a/go/operation/list_masters.go +++ /dev/null @@ -1,32 +0,0 @@ -package operation - -import ( - "encoding/json" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -type ClusterStatusResult struct { - IsLeader bool `json:"IsLeader,omitempty"` - Leader string `json:"Leader,omitempty"` - Peers []string `json:"Peers,omitempty"` -} - -func ListMasters(server string) ([]string, error) { - jsonBlob, err := util.Get("http://" + server + "/cluster/status") - glog.V(2).Info("list masters result :", string(jsonBlob)) - if err != nil { - return nil, err - } - var ret ClusterStatusResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - masters := ret.Peers - if ret.IsLeader { - masters = append(masters, ret.Leader) - } - return masters, nil -} diff --git a/go/operation/lookup.go b/go/operation/lookup.go deleted file mode 100644 index f77d1ec9b..000000000 --- a/go/operation/lookup.go +++ /dev/null @@ -1,118 +0,0 @@ -package operation - -import ( - "encoding/json" - "errors" - "fmt" - "math/rand" - "net/url" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/go/util" -) - -type Location struct { - Url string `json:"url,omitempty"` - PublicUrl string `json:"publicUrl,omitempty"` -} -type LookupResult struct { - VolumeId string `json:"volumeId,omitempty"` - Locations []Location `json:"locations,omitempty"` - Error string `json:"error,omitempty"` -} - -func (lr *LookupResult) String() string { - return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error) -} - -var ( - vc VidCache // caching of volume locations, re-check if after 10 minutes -) - -func Lookup(server string, vid string) (ret *LookupResult, err error) { - locations, cache_err := vc.Get(vid) - if cache_err != nil { - if ret, err = do_lookup(server, vid); err == nil { - vc.Set(vid, ret.Locations, 10*time.Minute) - } - } else { - ret = &LookupResult{VolumeId: vid, Locations: locations} - } - return -} - -func do_lookup(server string, vid string) (*LookupResult, error) { - values := make(url.Values) - values.Add("volumeId", vid) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) - if err != nil { - return nil, err - } - var ret LookupResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != "" { - return nil, errors.New(ret.Error) - } - return &ret, nil -} - -func LookupFileId(server string, fileId string) (fullUrl string, err error) { - parts := strings.Split(fileId, ",") - if len(parts) != 2 { - return "", errors.New("Invalid fileId " + fileId) - } - lookup, lookupError := Lookup(server, parts[0]) - if lookupError != nil { - return "", lookupError - } - if len(lookup.Locations) == 0 { - return "", errors.New("File Not Found") - } - return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil -} - -// LookupVolumeIds find volume locations by cache and actual lookup -func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { - ret := make(map[string]LookupResult) - var unknown_vids []string - - //check vid cache first - for _, vid := range vids { - locations, cache_err := vc.Get(vid) - if cache_err == nil { - ret[vid] = LookupResult{VolumeId: vid, Locations: locations} - } else { - unknown_vids = append(unknown_vids, vid) - } - } - //return success if all volume ids are known - if len(unknown_vids) == 0 { - return ret, nil - } - - //only query unknown_vids - values := make(url.Values) - for _, vid := range unknown_vids { - values.Add("volumeId", vid) - } - jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values) - if err != nil { - return nil, err - } - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, errors.New(err.Error() + " " + string(jsonBlob)) - } - - //set newly checked vids to cache - for _, vid := range unknown_vids { - locations := ret[vid].Locations - vc.Set(vid, locations, 10*time.Minute) - } - - return ret, nil -} diff --git a/go/operation/lookup_vid_cache.go b/go/operation/lookup_vid_cache.go deleted file mode 100644 index ac4240102..000000000 --- a/go/operation/lookup_vid_cache.go +++ /dev/null @@ -1,51 +0,0 @@ -package operation - -import ( - "errors" - "strconv" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -type VidInfo struct { - Locations []Location - NextRefreshTime time.Time -} -type VidCache struct { - cache []VidInfo -} - -func (vc *VidCache) Get(vid string) ([]Location, error) { - id, err := strconv.Atoi(vid) - if err != nil { - glog.V(1).Infof("Unknown volume id %s", vid) - return nil, err - } - if 0 < id && id <= len(vc.cache) { - if vc.cache[id-1].Locations == nil { - return nil, errors.New("Not Set") - } - if vc.cache[id-1].NextRefreshTime.Before(time.Now()) { - return nil, errors.New("Expired") - } - return vc.cache[id-1].Locations, nil - } - return nil, errors.New("Not Found") -} -func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) { - id, err := strconv.Atoi(vid) - if err != nil { - glog.V(1).Infof("Unknown volume id %s", vid) - return - } - if id > len(vc.cache) { - for i := id - len(vc.cache); i > 0; i-- { - vc.cache = append(vc.cache, VidInfo{}) - } - } - if id > 0 { - vc.cache[id-1].Locations = locations - vc.cache[id-1].NextRefreshTime = time.Now().Add(duration) - } -} diff --git a/go/operation/lookup_vid_cache_test.go b/go/operation/lookup_vid_cache_test.go deleted file mode 100644 index 9c9e2affb..000000000 --- a/go/operation/lookup_vid_cache_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package operation - -import ( - "fmt" - "testing" - "time" -) - -func TestCaching(t *testing.T) { - var ( - vc VidCache - ) - var locations []Location - locations = append(locations, Location{Url: "a.com:8080"}) - vc.Set("123", locations, time.Second) - ret, _ := vc.Get("123") - if ret == nil { - t.Fatal("Not found vid 123") - } - fmt.Printf("vid 123 locations = %v\n", ret) - time.Sleep(2 * time.Second) - ret, _ = vc.Get("123") - if ret != nil { - t.Fatal("Not found vid 123") - } -} diff --git a/go/operation/submit.go b/go/operation/submit.go deleted file mode 100644 index 18484680a..000000000 --- a/go/operation/submit.go +++ /dev/null @@ -1,194 +0,0 @@ -package operation - -import ( - "bytes" - "io" - "mime" - "net/url" - "os" - "path" - "strconv" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/security" -) - -type FilePart struct { - Reader io.Reader - FileName string - FileSize int64 - IsGzipped bool - MimeType string - ModTime int64 //in seconds - Replication string - Collection string - Ttl string - Server string //this comes from assign result - Fid string //this comes from assign result, but customizable -} - -type SubmitResult struct { - FileName string `json:"fileName,omitempty"` - FileUrl string `json:"fileUrl,omitempty"` - Fid string `json:"fid,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` -} - -func SubmitFiles(master string, files []FilePart, - replication string, collection string, ttl string, maxMB int, - secret security.Secret, -) ([]SubmitResult, error) { - results := make([]SubmitResult, len(files)) - for index, file := range files { - results[index].FileName = file.FileName - } - ret, err := Assign(master, uint64(len(files)), replication, collection, ttl) - if err != nil { - for index, _ := range files { - results[index].Error = err.Error() - } - return results, err - } - for index, file := range files { - file.Fid = ret.Fid - if index > 0 { - file.Fid = file.Fid + "_" + strconv.Itoa(index) - } - file.Server = ret.Url - file.Replication = replication - file.Collection = collection - results[index].Size, err = file.Upload(maxMB, master, secret) - if err != nil { - results[index].Error = err.Error() - } - results[index].Fid = file.Fid - results[index].FileUrl = ret.PublicUrl + "/" + file.Fid - } - return results, nil -} - -func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { - ret = make([]FilePart, len(fullPathFilenames)) - for index, file := range fullPathFilenames { - if ret[index], err = newFilePart(file); err != nil { - return - } - } - return -} -func newFilePart(fullPathFilename string) (ret FilePart, err error) { - fh, openErr := os.Open(fullPathFilename) - if openErr != nil { - glog.V(0).Info("Failed to open file: ", fullPathFilename) - return ret, openErr - } - ret.Reader = fh - - if fi, fiErr := fh.Stat(); fiErr != nil { - glog.V(0).Info("Failed to stat file:", fullPathFilename) - return ret, fiErr - } else { - ret.ModTime = fi.ModTime().UTC().Unix() - ret.FileSize = fi.Size() - } - ext := strings.ToLower(path.Ext(fullPathFilename)) - ret.IsGzipped = ext == ".gz" - if ret.IsGzipped { - ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3] - } - ret.FileName = fullPathFilename - if ext != "" { - ret.MimeType = mime.TypeByExtension(ext) - } - - return ret, nil -} - -func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) { - jwt := security.GenJwt(secret, fi.Fid) - fileUrl := "http://" + fi.Server + "/" + fi.Fid - if fi.ModTime != 0 { - fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) - } - if closer, ok := fi.Reader.(io.Closer); ok { - defer closer.Close() - } - baseName := path.Base(fi.FileName) - if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) { - chunkSize := int64(maxMB * 1024 * 1024) - chunks := fi.FileSize/chunkSize + 1 - cm := ChunkManifest{ - Name: baseName, - Size: fi.FileSize, - Mime: fi.MimeType, - Chunks: make([]*ChunkInfo, 0, chunks), - } - - for i := int64(0); i < chunks; i++ { - id, count, e := upload_one_chunk( - baseName+"-"+strconv.FormatInt(i+1, 10), - io.LimitReader(fi.Reader, chunkSize), - master, fi.Replication, fi.Collection, fi.Ttl, - jwt) - if e != nil { - // delete all uploaded chunks - cm.DeleteChunks(master) - return 0, e - } - cm.Chunks = append(cm.Chunks, - &ChunkInfo{ - Offset: i * chunkSize, - Size: int64(count), - Fid: id, - }, - ) - retSize += count - } - err = upload_chunked_file_manifest(fileUrl, &cm, jwt) - if err != nil { - // delete all uploaded chunks - cm.DeleteChunks(master) - } - } else { - ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt) - if e != nil { - return 0, e - } - return ret.Size, e - } - return -} - -func upload_one_chunk(filename string, reader io.Reader, master, - replication string, collection string, ttl string, jwt security.EncodedJwt, -) (fid string, size uint32, e error) { - ret, err := Assign(master, 1, replication, collection, ttl) - if err != nil { - return "", 0, err - } - fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid - glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError := Upload(fileUrl, filename, reader, false, - "application/octet-stream", jwt) - if uploadError != nil { - return fid, 0, uploadError - } - return fid, uploadResult.Size, nil -} - -func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { - buf, e := manifest.Marshal() - if e != nil { - return e - } - bufReader := bytes.NewReader(buf) - glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...") - u, _ := url.Parse(fileUrl) - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() - _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt) - return e -} diff --git a/go/operation/sync_volume.go b/go/operation/sync_volume.go deleted file mode 100644 index 54944a64e..000000000 --- a/go/operation/sync_volume.go +++ /dev/null @@ -1,54 +0,0 @@ -package operation - -import ( - "encoding/json" - "fmt" - "net/url" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -type SyncVolumeResponse struct { - Replication string `json:"Replication,omitempty"` - Ttl string `json:"Ttl,omitempty"` - TailOffset uint64 `json:"TailOffset,omitempty"` - CompactRevision uint16 `json:"CompactRevision,omitempty"` - IdxFileSize uint64 `json:"IdxFileSize,omitempty"` - Error string `json:"error,omitempty"` -} - -func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) { - values := make(url.Values) - values.Add("volume", vid) - jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values) - glog.V(2).Info("sync volume result :", string(jsonBlob)) - if err != nil { - return nil, err - } - var ret SyncVolumeResponse - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != "" { - return nil, fmt.Errorf("Volume %s get sync status error: %s", vid, ret.Error) - } - return &ret, nil -} - -func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error { - values := make(url.Values) - values.Add("volume", vid) - line := make([]byte, 16) - err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) { - key := util.BytesToUint64(bytes[:8]) - offset := util.BytesToUint32(bytes[8:12]) - size := util.BytesToUint32(bytes[12:16]) - eachEntryFn(key, offset, size) - }) - if err != nil { - return err - } - return nil -} diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go deleted file mode 100644 index 742a1ca4e..000000000 --- a/go/operation/system_message.pb.go +++ /dev/null @@ -1,203 +0,0 @@ -// Code generated by protoc-gen-go. -// source: system_message.proto -// DO NOT EDIT! - -/* -Package operation is a generated protocol buffer package. - -It is generated from these files: - system_message.proto - -It has these top-level messages: - VolumeInformationMessage - JoinMessage -*/ -package operation - -import proto "github.com/golang/protobuf/proto" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = math.Inf - -type VolumeInformationMessage struct { - Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` - Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"` - Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` - FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"` - DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` - DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` - ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` - ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` - Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` - Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } -func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } -func (*VolumeInformationMessage) ProtoMessage() {} - -const Default_VolumeInformationMessage_Version uint32 = 2 - -func (m *VolumeInformationMessage) GetId() uint32 { - if m != nil && m.Id != nil { - return *m.Id - } - return 0 -} - -func (m *VolumeInformationMessage) GetSize() uint64 { - if m != nil && m.Size != nil { - return *m.Size - } - return 0 -} - -func (m *VolumeInformationMessage) GetCollection() string { - if m != nil && m.Collection != nil { - return *m.Collection - } - return "" -} - -func (m *VolumeInformationMessage) GetFileCount() uint64 { - if m != nil && m.FileCount != nil { - return *m.FileCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetDeleteCount() uint64 { - if m != nil && m.DeleteCount != nil { - return *m.DeleteCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 { - if m != nil && m.DeletedByteCount != nil { - return *m.DeletedByteCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetReadOnly() bool { - if m != nil && m.ReadOnly != nil { - return *m.ReadOnly - } - return false -} - -func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 { - if m != nil && m.ReplicaPlacement != nil { - return *m.ReplicaPlacement - } - return 0 -} - -func (m *VolumeInformationMessage) GetVersion() uint32 { - if m != nil && m.Version != nil { - return *m.Version - } - return Default_VolumeInformationMessage_Version -} - -func (m *VolumeInformationMessage) GetTtl() uint32 { - if m != nil && m.Ttl != nil { - return *m.Ttl - } - return 0 -} - -type JoinMessage struct { - IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` - Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` - Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"` - PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"` - MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"` - MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"` - DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"` - Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` - Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` - AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *JoinMessage) Reset() { *m = JoinMessage{} } -func (m *JoinMessage) String() string { return proto.CompactTextString(m) } -func (*JoinMessage) ProtoMessage() {} - -func (m *JoinMessage) GetIsInit() bool { - if m != nil && m.IsInit != nil { - return *m.IsInit - } - return false -} - -func (m *JoinMessage) GetIp() string { - if m != nil && m.Ip != nil { - return *m.Ip - } - return "" -} - -func (m *JoinMessage) GetPort() uint32 { - if m != nil && m.Port != nil { - return *m.Port - } - return 0 -} - -func (m *JoinMessage) GetPublicUrl() string { - if m != nil && m.PublicUrl != nil { - return *m.PublicUrl - } - return "" -} - -func (m *JoinMessage) GetMaxVolumeCount() uint32 { - if m != nil && m.MaxVolumeCount != nil { - return *m.MaxVolumeCount - } - return 0 -} - -func (m *JoinMessage) GetMaxFileKey() uint64 { - if m != nil && m.MaxFileKey != nil { - return *m.MaxFileKey - } - return 0 -} - -func (m *JoinMessage) GetDataCenter() string { - if m != nil && m.DataCenter != nil { - return *m.DataCenter - } - return "" -} - -func (m *JoinMessage) GetRack() string { - if m != nil && m.Rack != nil { - return *m.Rack - } - return "" -} - -func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage { - if m != nil { - return m.Volumes - } - return nil -} - -func (m *JoinMessage) GetAdminPort() uint32 { - if m != nil && m.AdminPort != nil { - return *m.AdminPort - } - return 0 -} - -func init() { -} diff --git a/go/operation/system_message_test.go b/go/operation/system_message_test.go deleted file mode 100644 index d18ca49a4..000000000 --- a/go/operation/system_message_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package operation - -import ( - "encoding/json" - "log" - "testing" - - "github.com/golang/protobuf/proto" -) - -func TestSerialDeserial(t *testing.T) { - volumeMessage := &VolumeInformationMessage{ - Id: proto.Uint32(12), - Size: proto.Uint64(2341234), - Collection: proto.String("benchmark"), - FileCount: proto.Uint64(2341234), - DeleteCount: proto.Uint64(234), - DeletedByteCount: proto.Uint64(21234), - ReadOnly: proto.Bool(false), - ReplicaPlacement: proto.Uint32(210), - Version: proto.Uint32(2), - } - var volumeMessages []*VolumeInformationMessage - volumeMessages = append(volumeMessages, volumeMessage) - - joinMessage := &JoinMessage{ - IsInit: proto.Bool(true), - Ip: proto.String("127.0.3.12"), - Port: proto.Uint32(34546), - PublicUrl: proto.String("localhost:2342"), - MaxVolumeCount: proto.Uint32(210), - MaxFileKey: proto.Uint64(324234423), - DataCenter: proto.String("dc1"), - Rack: proto.String("rack2"), - Volumes: volumeMessages, - } - - data, err := proto.Marshal(joinMessage) - if err != nil { - log.Fatal("marshaling error: ", err) - } - newMessage := &JoinMessage{} - err = proto.Unmarshal(data, newMessage) - if err != nil { - log.Fatal("unmarshaling error: ", err) - } - log.Println("The pb data size is", len(data)) - - jsonData, jsonError := json.Marshal(joinMessage) - if jsonError != nil { - log.Fatal("json marshaling error: ", jsonError) - } - log.Println("The json data size is", len(jsonData), string(jsonData)) - - // Now test and newTest contain the same data. - if *joinMessage.PublicUrl != *newMessage.PublicUrl { - log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl) - } -} diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go deleted file mode 100644 index c3fd5f4b1..000000000 --- a/go/operation/upload_content.go +++ /dev/null @@ -1,96 +0,0 @@ -package operation - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" - "mime/multipart" - "net/http" - "net/textproto" - "path/filepath" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/security" -) - -type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` -} - -var ( - client *http.Client -) - -func init() { - client = &http.Client{Transport: &http.Transport{ - MaxIdleConnsPerHost: 1024, - }} -} - -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") - -func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { - return upload_content(uploadUrl, func(w io.Writer) (err error) { - _, err = io.Copy(w, reader) - return - }, filename, isGzipped, mtype, jwt) -} -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) - h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) - if mtype == "" { - mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) - } - if mtype != "" { - h.Set("Content-Type", mtype) - } - if isGzipped { - h.Set("Content-Encoding", "gzip") - } - if jwt != "" { - h.Set("Authorization", "BEARER "+string(jwt)) - } - file_writer, cp_err := body_writer.CreatePart(h) - if cp_err != nil { - glog.V(0).Infoln("error creating form file", cp_err.Error()) - return nil, cp_err - } - if err := fillBufferFunction(file_writer); err != nil { - glog.V(0).Infoln("error copying data", err) - return nil, err - } - content_type := body_writer.FormDataContentType() - if err := body_writer.Close(); err != nil { - glog.V(0).Infoln("error closing body", err) - return nil, err - } - resp, post_err := client.Post(uploadUrl, content_type, body_buf) - if post_err != nil { - glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) - return nil, post_err - } - defer resp.Body.Close() - resp_body, ra_err := ioutil.ReadAll(resp.Body) - if ra_err != nil { - return nil, ra_err - } - var ret UploadResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body)) - return nil, unmarshal_err - } - if ret.Error != "" { - return nil, errors.New(ret.Error) - } - return &ret, nil -} |
