diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 48 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 213 | ||||
| -rw-r--r-- | weed/operation/compress.go | 59 | ||||
| -rw-r--r-- | weed/operation/data_struts.go | 7 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 117 | ||||
| -rw-r--r-- | weed/operation/list_masters.go | 32 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 118 | ||||
| -rw-r--r-- | weed/operation/lookup_vid_cache.go | 51 | ||||
| -rw-r--r-- | weed/operation/lookup_vid_cache_test.go | 26 | ||||
| -rw-r--r-- | weed/operation/submit.go | 194 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 54 | ||||
| -rw-r--r-- | weed/operation/system_message.pb.go | 203 | ||||
| -rw-r--r-- | weed/operation/system_message_test.go | 59 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 96 |
14 files changed, 1277 insertions, 0 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go new file mode 100644 index 000000000..acc2d3034 --- /dev/null +++ b/weed/operation/assign_file_id.go @@ -0,0 +1,48 @@ +package operation + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type AssignResult struct { + Fid string `json:"fid,omitempty"` + Url string `json:"url,omitempty"` + PublicUrl string `json:"publicUrl,omitempty"` + Count uint64 `json:"count,omitempty"` + Error string `json:"error,omitempty"` +} + +func Assign(server string, count uint64, replication string, collection string, ttl string) (*AssignResult, error) { + values := make(url.Values) + values.Add("count", strconv.FormatUint(count, 10)) + if replication != "" { + values.Add("replication", replication) + } + if collection != "" { + values.Add("collection", collection) + } + if ttl != "" { + values.Add("ttl", ttl) + } + jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) + glog.V(2).Info("assign result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret AssignResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, fmt.Errorf("/dir/assign result JSON unmarshal error:%v, json:%s", err, string(jsonBlob)) + } + if ret.Count <= 0 { + return nil, errors.New(ret.Error) + } + return &ret, nil +} diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go new file mode 100644 index 000000000..52086514a --- /dev/null +++ b/weed/operation/chunked_file.go @@ -0,0 +1,213 @@ +package operation + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sort" + + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + // when the remote server does not allow range requests (Accept-Ranges was not set) + ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server") + // ErrInvalidRange is returned by Read when trying to read past the end of the file + ErrInvalidRange = errors.New("Invalid range") +) + +type ChunkInfo struct { + Fid string `json:"fid"` + Offset int64 `json:"offset"` + Size int64 `json:"size"` +} + +type ChunkList []*ChunkInfo + +type ChunkManifest struct { + Name string `json:"name,omitempty"` + Mime string `json:"mime,omitempty"` + Size int64 `json:"size,omitempty"` + Chunks ChunkList `json:"chunks,omitempty"` +} + +// seekable chunked file reader +type ChunkedFileReader struct { + Manifest *ChunkManifest + Master string + pos int64 + pr *io.PipeReader + pw *io.PipeWriter + mutex sync.Mutex +} + +func (s ChunkList) Len() int { return len(s) } +func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } +func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { + if isGzipped { + var err error + if buffer, err = UnGzipData(buffer); err != nil { + return nil, err + } + } + cm := ChunkManifest{} + if e := json.Unmarshal(buffer, &cm); e != nil { + return nil, e + } + sort.Sort(cm.Chunks) + return &cm, nil +} + +func (cm *ChunkManifest) Marshal() ([]byte, error) { + return json.Marshal(cm) +} + +func (cm *ChunkManifest) DeleteChunks(master string) error { + deleteError := 0 + for _, ci := range cm.Chunks { + if e := DeleteFile(master, ci.Fid, ""); e != nil { + deleteError++ + glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master) + } + } + if deleteError > 0 { + return errors.New("Not all chunks deleted.") + } + return nil +} + +func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) { + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return written, err + } + if offset > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + + resp, err := util.Do(req) + if err != nil { + return written, err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusRequestedRangeNotSatisfiable: + return written, ErrInvalidRange + case http.StatusOK: + if offset > 0 { + return written, ErrRangeRequestsNotSupported + } + case http.StatusPartialContent: + break + default: + return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl) + + } + return io.Copy(w, resp.Body) +} + +func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { + var err error + switch whence { + case 0: + case 1: + offset += cf.pos + case 2: + offset = cf.Manifest.Size - offset + } + if offset > cf.Manifest.Size { + err = ErrInvalidRange + } + if cf.pos != offset { + cf.Close() + } + cf.pos = offset + return cf.pos, err +} + +func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { + cm := cf.Manifest + chunkIndex := -1 + chunkStartOffset := int64(0) + for i, ci := range cm.Chunks { + if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size { + chunkIndex = i + chunkStartOffset = cf.pos - ci.Offset + break + } + } + if chunkIndex < 0 { + return n, ErrInvalidRange + } + for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { + ci := cm.Chunks[chunkIndex] + // if we need read date from local volume server first? + fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid) + if lookupError != nil { + return n, lookupError + } + if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil { + return n, e + } else { + n += wn + cf.pos += wn + } + + chunkStartOffset = 0 + } + return n, nil +} + +func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) { + cf.Seek(off, 0) + return cf.Read(p) +} + +func (cf *ChunkedFileReader) Read(p []byte) (int, error) { + return cf.getPipeReader().Read(p) +} + +func (cf *ChunkedFileReader) Close() (e error) { + cf.mutex.Lock() + defer cf.mutex.Unlock() + return cf.closePipe() +} + +func (cf *ChunkedFileReader) closePipe() (e error) { + if cf.pr != nil { + if err := cf.pr.Close(); err != nil { + e = err + } + } + cf.pr = nil + if cf.pw != nil { + if err := cf.pw.Close(); err != nil { + e = err + } + } + cf.pw = nil + return e +} + +func (cf *ChunkedFileReader) getPipeReader() io.Reader { + cf.mutex.Lock() + defer cf.mutex.Unlock() + if cf.pr != nil && cf.pw != nil { + return cf.pr + } + cf.closePipe() + cf.pr, cf.pw = io.Pipe() + go func(pw *io.PipeWriter) { + _, e := cf.WriteTo(pw) + pw.CloseWithError(e) + }(cf.pw) + return cf.pr +} diff --git a/weed/operation/compress.go b/weed/operation/compress.go new file mode 100644 index 000000000..de62e5bf7 --- /dev/null +++ b/weed/operation/compress.go @@ -0,0 +1,59 @@ +package operation + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "io/ioutil" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +/* +* Default more not to gzip since gzip can be done on client side. + */ +func IsGzippable(ext, mtype string) bool { + if strings.HasPrefix(mtype, "text/") { + return true + } + switch ext { + case ".zip", ".rar", ".gz", ".bz2", ".xz": + return false + case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json": + return true + } + if strings.HasPrefix(mtype, "application/") { + if strings.HasSuffix(mtype, "xml") { + return true + } + if strings.HasSuffix(mtype, "script") { + return true + } + } + return false +} + +func GzipData(input []byte) ([]byte, error) { + buf := new(bytes.Buffer) + w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) + if _, err := w.Write(input); err != nil { + glog.V(2).Infoln("error compressing data:", err) + return nil, err + } + if err := w.Close(); err != nil { + glog.V(2).Infoln("error closing compressed data:", err) + return nil, err + } + return buf.Bytes(), nil +} +func UnGzipData(input []byte) ([]byte, error) { + buf := bytes.NewBuffer(input) + r, _ := gzip.NewReader(buf) + defer r.Close() + output, err := ioutil.ReadAll(r) + if err != nil { + glog.V(2).Infoln("error uncompressing data:", err) + } + return output, err +} diff --git a/weed/operation/data_struts.go b/weed/operation/data_struts.go new file mode 100644 index 000000000..bfc53aa50 --- /dev/null +++ b/weed/operation/data_struts.go @@ -0,0 +1,7 @@ +package operation + +type JoinResult struct { + VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` + SecretKey string `json:"secretKey,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go new file mode 100644 index 000000000..b78221da1 --- /dev/null +++ b/weed/operation/delete_content.go @@ -0,0 +1,117 @@ +package operation + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "strings" + "sync" + + "net/http" + + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type DeleteResult struct { + Fid string `json:"fid"` + Size int `json:"size"` + Status int `json:"status"` + Error string `json:"error,omitempty"` +} + +func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { + fileUrl, err := LookupFileId(master, fileId) + if err != nil { + return fmt.Errorf("Failed to lookup %s:%v", fileId, err) + } + err = util.Delete(fileUrl, jwt) + if err != nil { + return fmt.Errorf("Failed to delete %s:%v", fileUrl, err) + } + return nil +} + +func ParseFileId(fid string) (vid string, key_cookie string, err error) { + commaIndex := strings.Index(fid, ",") + if commaIndex <= 0 { + return "", "", errors.New("Wrong fid format.") + } + return fid[:commaIndex], fid[commaIndex+1:], nil +} + +type DeleteFilesResult struct { + Errors []string + Results []DeleteResult +} + +func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { + vid_to_fileIds := make(map[string][]string) + ret := &DeleteFilesResult{} + var vids []string + for _, fileId := range fileIds { + vid, _, err := ParseFileId(fileId) + if err != nil { + ret.Results = append(ret.Results, DeleteResult{ + Fid: vid, + Status: http.StatusBadRequest, + Error: err.Error()}, + ) + continue + } + if _, ok := vid_to_fileIds[vid]; !ok { + vid_to_fileIds[vid] = make([]string, 0) + vids = append(vids, vid) + } + vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId) + } + + lookupResults, err := LookupVolumeIds(master, vids) + if err != nil { + return ret, err + } + + server_to_fileIds := make(map[string][]string) + for vid, result := range lookupResults { + if result.Error != "" { + ret.Errors = append(ret.Errors, result.Error) + continue + } + for _, location := range result.Locations { + if _, ok := server_to_fileIds[location.Url]; !ok { + server_to_fileIds[location.Url] = make([]string, 0) + } + server_to_fileIds[location.Url] = append( + server_to_fileIds[location.Url], vid_to_fileIds[vid]...) + } + } + + var wg sync.WaitGroup + + for server, fidList := range server_to_fileIds { + wg.Add(1) + go func(server string, fidList []string) { + defer wg.Done() + values := make(url.Values) + for _, fid := range fidList { + values.Add("fid", fid) + } + jsonBlob, err := util.Post("http://"+server+"/delete", values) + if err != nil { + ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) + return + } + var result []DeleteResult + err = json.Unmarshal(jsonBlob, &result) + if err != nil { + ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) + return + } + ret.Results = append(ret.Results, result...) + }(server, fidList) + } + wg.Wait() + + return ret, nil +} diff --git a/weed/operation/list_masters.go b/weed/operation/list_masters.go new file mode 100644 index 000000000..0a15b0af8 --- /dev/null +++ b/weed/operation/list_masters.go @@ -0,0 +1,32 @@ +package operation + +import ( + "encoding/json" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type ClusterStatusResult struct { + IsLeader bool `json:"IsLeader,omitempty"` + Leader string `json:"Leader,omitempty"` + Peers []string `json:"Peers,omitempty"` +} + +func ListMasters(server string) ([]string, error) { + jsonBlob, err := util.Get("http://" + server + "/cluster/status") + glog.V(2).Info("list masters result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret ClusterStatusResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + masters := ret.Peers + if ret.IsLeader { + masters = append(masters, ret.Leader) + } + return masters, nil +} diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go new file mode 100644 index 000000000..19d9dbb94 --- /dev/null +++ b/weed/operation/lookup.go @@ -0,0 +1,118 @@ +package operation + +import ( + "encoding/json" + "errors" + "fmt" + "math/rand" + "net/url" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +type Location struct { + Url string `json:"url,omitempty"` + PublicUrl string `json:"publicUrl,omitempty"` +} +type LookupResult struct { + VolumeId string `json:"volumeId,omitempty"` + Locations []Location `json:"locations,omitempty"` + Error string `json:"error,omitempty"` +} + +func (lr *LookupResult) String() string { + return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error) +} + +var ( + vc VidCache // caching of volume locations, re-check if after 10 minutes +) + +func Lookup(server string, vid string) (ret *LookupResult, err error) { + locations, cache_err := vc.Get(vid) + if cache_err != nil { + if ret, err = do_lookup(server, vid); err == nil { + vc.Set(vid, ret.Locations, 10*time.Minute) + } + } else { + ret = &LookupResult{VolumeId: vid, Locations: locations} + } + return +} + +func do_lookup(server string, vid string) (*LookupResult, error) { + values := make(url.Values) + values.Add("volumeId", vid) + jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + if err != nil { + return nil, err + } + var ret LookupResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil +} + +func LookupFileId(server string, fileId string) (fullUrl string, err error) { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return "", errors.New("Invalid fileId " + fileId) + } + lookup, lookupError := Lookup(server, parts[0]) + if lookupError != nil { + return "", lookupError + } + if len(lookup.Locations) == 0 { + return "", errors.New("File Not Found") + } + return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil +} + +// LookupVolumeIds find volume locations by cache and actual lookup +func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { + ret := make(map[string]LookupResult) + var unknown_vids []string + + //check vid cache first + for _, vid := range vids { + locations, cache_err := vc.Get(vid) + if cache_err == nil { + ret[vid] = LookupResult{VolumeId: vid, Locations: locations} + } else { + unknown_vids = append(unknown_vids, vid) + } + } + //return success if all volume ids are known + if len(unknown_vids) == 0 { + return ret, nil + } + + //only query unknown_vids + values := make(url.Values) + for _, vid := range unknown_vids { + values.Add("volumeId", vid) + } + jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values) + if err != nil { + return nil, err + } + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, errors.New(err.Error() + " " + string(jsonBlob)) + } + + //set newly checked vids to cache + for _, vid := range unknown_vids { + locations := ret[vid].Locations + vc.Set(vid, locations, 10*time.Minute) + } + + return ret, nil +} diff --git a/weed/operation/lookup_vid_cache.go b/weed/operation/lookup_vid_cache.go new file mode 100644 index 000000000..1ed03613d --- /dev/null +++ b/weed/operation/lookup_vid_cache.go @@ -0,0 +1,51 @@ +package operation + +import ( + "errors" + "strconv" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type VidInfo struct { + Locations []Location + NextRefreshTime time.Time +} +type VidCache struct { + cache []VidInfo +} + +func (vc *VidCache) Get(vid string) ([]Location, error) { + id, err := strconv.Atoi(vid) + if err != nil { + glog.V(1).Infof("Unknown volume id %s", vid) + return nil, err + } + if 0 < id && id <= len(vc.cache) { + if vc.cache[id-1].Locations == nil { + return nil, errors.New("Not Set") + } + if vc.cache[id-1].NextRefreshTime.Before(time.Now()) { + return nil, errors.New("Expired") + } + return vc.cache[id-1].Locations, nil + } + return nil, errors.New("Not Found") +} +func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) { + id, err := strconv.Atoi(vid) + if err != nil { + glog.V(1).Infof("Unknown volume id %s", vid) + return + } + if id > len(vc.cache) { + for i := id - len(vc.cache); i > 0; i-- { + vc.cache = append(vc.cache, VidInfo{}) + } + } + if id > 0 { + vc.cache[id-1].Locations = locations + vc.cache[id-1].NextRefreshTime = time.Now().Add(duration) + } +} diff --git a/weed/operation/lookup_vid_cache_test.go b/weed/operation/lookup_vid_cache_test.go new file mode 100644 index 000000000..9c9e2affb --- /dev/null +++ b/weed/operation/lookup_vid_cache_test.go @@ -0,0 +1,26 @@ +package operation + +import ( + "fmt" + "testing" + "time" +) + +func TestCaching(t *testing.T) { + var ( + vc VidCache + ) + var locations []Location + locations = append(locations, Location{Url: "a.com:8080"}) + vc.Set("123", locations, time.Second) + ret, _ := vc.Get("123") + if ret == nil { + t.Fatal("Not found vid 123") + } + fmt.Printf("vid 123 locations = %v\n", ret) + time.Sleep(2 * time.Second) + ret, _ = vc.Get("123") + if ret != nil { + t.Fatal("Not found vid 123") + } +} diff --git a/weed/operation/submit.go b/weed/operation/submit.go new file mode 100644 index 000000000..19bbd7a70 --- /dev/null +++ b/weed/operation/submit.go @@ -0,0 +1,194 @@ +package operation + +import ( + "bytes" + "io" + "mime" + "net/url" + "os" + "path" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" +) + +type FilePart struct { + Reader io.Reader + FileName string + FileSize int64 + IsGzipped bool + MimeType string + ModTime int64 //in seconds + Replication string + Collection string + Ttl string + Server string //this comes from assign result + Fid string //this comes from assign result, but customizable +} + +type SubmitResult struct { + FileName string `json:"fileName,omitempty"` + FileUrl string `json:"fileUrl,omitempty"` + Fid string `json:"fid,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` +} + +func SubmitFiles(master string, files []FilePart, + replication string, collection string, ttl string, maxMB int, + secret security.Secret, +) ([]SubmitResult, error) { + results := make([]SubmitResult, len(files)) + for index, file := range files { + results[index].FileName = file.FileName + } + ret, err := Assign(master, uint64(len(files)), replication, collection, ttl) + if err != nil { + for index, _ := range files { + results[index].Error = err.Error() + } + return results, err + } + for index, file := range files { + file.Fid = ret.Fid + if index > 0 { + file.Fid = file.Fid + "_" + strconv.Itoa(index) + } + file.Server = ret.Url + file.Replication = replication + file.Collection = collection + results[index].Size, err = file.Upload(maxMB, master, secret) + if err != nil { + results[index].Error = err.Error() + } + results[index].Fid = file.Fid + results[index].FileUrl = ret.PublicUrl + "/" + file.Fid + } + return results, nil +} + +func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { + ret = make([]FilePart, len(fullPathFilenames)) + for index, file := range fullPathFilenames { + if ret[index], err = newFilePart(file); err != nil { + return + } + } + return +} +func newFilePart(fullPathFilename string) (ret FilePart, err error) { + fh, openErr := os.Open(fullPathFilename) + if openErr != nil { + glog.V(0).Info("Failed to open file: ", fullPathFilename) + return ret, openErr + } + ret.Reader = fh + + if fi, fiErr := fh.Stat(); fiErr != nil { + glog.V(0).Info("Failed to stat file:", fullPathFilename) + return ret, fiErr + } else { + ret.ModTime = fi.ModTime().UTC().Unix() + ret.FileSize = fi.Size() + } + ext := strings.ToLower(path.Ext(fullPathFilename)) + ret.IsGzipped = ext == ".gz" + if ret.IsGzipped { + ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3] + } + ret.FileName = fullPathFilename + if ext != "" { + ret.MimeType = mime.TypeByExtension(ext) + } + + return ret, nil +} + +func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) { + jwt := security.GenJwt(secret, fi.Fid) + fileUrl := "http://" + fi.Server + "/" + fi.Fid + if fi.ModTime != 0 { + fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) + } + if closer, ok := fi.Reader.(io.Closer); ok { + defer closer.Close() + } + baseName := path.Base(fi.FileName) + if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) { + chunkSize := int64(maxMB * 1024 * 1024) + chunks := fi.FileSize/chunkSize + 1 + cm := ChunkManifest{ + Name: baseName, + Size: fi.FileSize, + Mime: fi.MimeType, + Chunks: make([]*ChunkInfo, 0, chunks), + } + + for i := int64(0); i < chunks; i++ { + id, count, e := upload_one_chunk( + baseName+"-"+strconv.FormatInt(i+1, 10), + io.LimitReader(fi.Reader, chunkSize), + master, fi.Replication, fi.Collection, fi.Ttl, + jwt) + if e != nil { + // delete all uploaded chunks + cm.DeleteChunks(master) + return 0, e + } + cm.Chunks = append(cm.Chunks, + &ChunkInfo{ + Offset: i * chunkSize, + Size: int64(count), + Fid: id, + }, + ) + retSize += count + } + err = upload_chunked_file_manifest(fileUrl, &cm, jwt) + if err != nil { + // delete all uploaded chunks + cm.DeleteChunks(master) + } + } else { + ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt) + if e != nil { + return 0, e + } + return ret.Size, e + } + return +} + +func upload_one_chunk(filename string, reader io.Reader, master, + replication string, collection string, ttl string, jwt security.EncodedJwt, +) (fid string, size uint32, e error) { + ret, err := Assign(master, 1, replication, collection, ttl) + if err != nil { + return "", 0, err + } + fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid + glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") + uploadResult, uploadError := Upload(fileUrl, filename, reader, false, + "application/octet-stream", jwt) + if uploadError != nil { + return fid, 0, uploadError + } + return fid, uploadResult.Size, nil +} + +func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { + buf, e := manifest.Marshal() + if e != nil { + return e + } + bufReader := bytes.NewReader(buf) + glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...") + u, _ := url.Parse(fileUrl) + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt) + return e +} diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go new file mode 100644 index 000000000..b7a727fc7 --- /dev/null +++ b/weed/operation/sync_volume.go @@ -0,0 +1,54 @@ +package operation + +import ( + "encoding/json" + "fmt" + "net/url" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type SyncVolumeResponse struct { + Replication string `json:"Replication,omitempty"` + Ttl string `json:"Ttl,omitempty"` + TailOffset uint64 `json:"TailOffset,omitempty"` + CompactRevision uint16 `json:"CompactRevision,omitempty"` + IdxFileSize uint64 `json:"IdxFileSize,omitempty"` + Error string `json:"error,omitempty"` +} + +func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) { + values := make(url.Values) + values.Add("volume", vid) + jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values) + glog.V(2).Info("sync volume result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret SyncVolumeResponse + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, fmt.Errorf("Volume %s get sync status error: %s", vid, ret.Error) + } + return &ret, nil +} + +func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error { + values := make(url.Values) + values.Add("volume", vid) + line := make([]byte, 16) + err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) { + key := util.BytesToUint64(bytes[:8]) + offset := util.BytesToUint32(bytes[8:12]) + size := util.BytesToUint32(bytes[12:16]) + eachEntryFn(key, offset, size) + }) + if err != nil { + return err + } + return nil +} diff --git a/weed/operation/system_message.pb.go b/weed/operation/system_message.pb.go new file mode 100644 index 000000000..742a1ca4e --- /dev/null +++ b/weed/operation/system_message.pb.go @@ -0,0 +1,203 @@ +// Code generated by protoc-gen-go. +// source: system_message.proto +// DO NOT EDIT! + +/* +Package operation is a generated protocol buffer package. + +It is generated from these files: + system_message.proto + +It has these top-level messages: + VolumeInformationMessage + JoinMessage +*/ +package operation + +import proto "github.com/golang/protobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type VolumeInformationMessage struct { + Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` + Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"` + Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` + FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"` + DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` + DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` + ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` + ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` + Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` + Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } +func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } +func (*VolumeInformationMessage) ProtoMessage() {} + +const Default_VolumeInformationMessage_Version uint32 = 2 + +func (m *VolumeInformationMessage) GetId() uint32 { + if m != nil && m.Id != nil { + return *m.Id + } + return 0 +} + +func (m *VolumeInformationMessage) GetSize() uint64 { + if m != nil && m.Size != nil { + return *m.Size + } + return 0 +} + +func (m *VolumeInformationMessage) GetCollection() string { + if m != nil && m.Collection != nil { + return *m.Collection + } + return "" +} + +func (m *VolumeInformationMessage) GetFileCount() uint64 { + if m != nil && m.FileCount != nil { + return *m.FileCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetDeleteCount() uint64 { + if m != nil && m.DeleteCount != nil { + return *m.DeleteCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 { + if m != nil && m.DeletedByteCount != nil { + return *m.DeletedByteCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetReadOnly() bool { + if m != nil && m.ReadOnly != nil { + return *m.ReadOnly + } + return false +} + +func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 { + if m != nil && m.ReplicaPlacement != nil { + return *m.ReplicaPlacement + } + return 0 +} + +func (m *VolumeInformationMessage) GetVersion() uint32 { + if m != nil && m.Version != nil { + return *m.Version + } + return Default_VolumeInformationMessage_Version +} + +func (m *VolumeInformationMessage) GetTtl() uint32 { + if m != nil && m.Ttl != nil { + return *m.Ttl + } + return 0 +} + +type JoinMessage struct { + IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` + Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` + Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"` + PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"` + MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"` + MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"` + DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"` + Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` + Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` + AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *JoinMessage) Reset() { *m = JoinMessage{} } +func (m *JoinMessage) String() string { return proto.CompactTextString(m) } +func (*JoinMessage) ProtoMessage() {} + +func (m *JoinMessage) GetIsInit() bool { + if m != nil && m.IsInit != nil { + return *m.IsInit + } + return false +} + +func (m *JoinMessage) GetIp() string { + if m != nil && m.Ip != nil { + return *m.Ip + } + return "" +} + +func (m *JoinMessage) GetPort() uint32 { + if m != nil && m.Port != nil { + return *m.Port + } + return 0 +} + +func (m *JoinMessage) GetPublicUrl() string { + if m != nil && m.PublicUrl != nil { + return *m.PublicUrl + } + return "" +} + +func (m *JoinMessage) GetMaxVolumeCount() uint32 { + if m != nil && m.MaxVolumeCount != nil { + return *m.MaxVolumeCount + } + return 0 +} + +func (m *JoinMessage) GetMaxFileKey() uint64 { + if m != nil && m.MaxFileKey != nil { + return *m.MaxFileKey + } + return 0 +} + +func (m *JoinMessage) GetDataCenter() string { + if m != nil && m.DataCenter != nil { + return *m.DataCenter + } + return "" +} + +func (m *JoinMessage) GetRack() string { + if m != nil && m.Rack != nil { + return *m.Rack + } + return "" +} + +func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage { + if m != nil { + return m.Volumes + } + return nil +} + +func (m *JoinMessage) GetAdminPort() uint32 { + if m != nil && m.AdminPort != nil { + return *m.AdminPort + } + return 0 +} + +func init() { +} diff --git a/weed/operation/system_message_test.go b/weed/operation/system_message_test.go new file mode 100644 index 000000000..d18ca49a4 --- /dev/null +++ b/weed/operation/system_message_test.go @@ -0,0 +1,59 @@ +package operation + +import ( + "encoding/json" + "log" + "testing" + + "github.com/golang/protobuf/proto" +) + +func TestSerialDeserial(t *testing.T) { + volumeMessage := &VolumeInformationMessage{ + Id: proto.Uint32(12), + Size: proto.Uint64(2341234), + Collection: proto.String("benchmark"), + FileCount: proto.Uint64(2341234), + DeleteCount: proto.Uint64(234), + DeletedByteCount: proto.Uint64(21234), + ReadOnly: proto.Bool(false), + ReplicaPlacement: proto.Uint32(210), + Version: proto.Uint32(2), + } + var volumeMessages []*VolumeInformationMessage + volumeMessages = append(volumeMessages, volumeMessage) + + joinMessage := &JoinMessage{ + IsInit: proto.Bool(true), + Ip: proto.String("127.0.3.12"), + Port: proto.Uint32(34546), + PublicUrl: proto.String("localhost:2342"), + MaxVolumeCount: proto.Uint32(210), + MaxFileKey: proto.Uint64(324234423), + DataCenter: proto.String("dc1"), + Rack: proto.String("rack2"), + Volumes: volumeMessages, + } + + data, err := proto.Marshal(joinMessage) + if err != nil { + log.Fatal("marshaling error: ", err) + } + newMessage := &JoinMessage{} + err = proto.Unmarshal(data, newMessage) + if err != nil { + log.Fatal("unmarshaling error: ", err) + } + log.Println("The pb data size is", len(data)) + + jsonData, jsonError := json.Marshal(joinMessage) + if jsonError != nil { + log.Fatal("json marshaling error: ", jsonError) + } + log.Println("The json data size is", len(jsonData), string(jsonData)) + + // Now test and newTest contain the same data. + if *joinMessage.PublicUrl != *newMessage.PublicUrl { + log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl) + } +} diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go new file mode 100644 index 000000000..a87784cad --- /dev/null +++ b/weed/operation/upload_content.go @@ -0,0 +1,96 @@ +package operation + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "mime" + "mime/multipart" + "net/http" + "net/textproto" + "path/filepath" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" +) + +type UploadResult struct { + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` +} + +var ( + client *http.Client +) + +func init() { + client = &http.Client{Transport: &http.Transport{ + MaxIdleConnsPerHost: 1024, + }} +} + +var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") + +func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { + return upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = io.Copy(w, reader) + return + }, filename, isGzipped, mtype, jwt) +} +func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { + body_buf := bytes.NewBufferString("") + body_writer := multipart.NewWriter(body_buf) + h := make(textproto.MIMEHeader) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) + if mtype == "" { + mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) + } + if mtype != "" { + h.Set("Content-Type", mtype) + } + if isGzipped { + h.Set("Content-Encoding", "gzip") + } + if jwt != "" { + h.Set("Authorization", "BEARER "+string(jwt)) + } + file_writer, cp_err := body_writer.CreatePart(h) + if cp_err != nil { + glog.V(0).Infoln("error creating form file", cp_err.Error()) + return nil, cp_err + } + if err := fillBufferFunction(file_writer); err != nil { + glog.V(0).Infoln("error copying data", err) + return nil, err + } + content_type := body_writer.FormDataContentType() + if err := body_writer.Close(); err != nil { + glog.V(0).Infoln("error closing body", err) + return nil, err + } + resp, post_err := client.Post(uploadUrl, content_type, body_buf) + if post_err != nil { + glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) + return nil, post_err + } + defer resp.Body.Close() + resp_body, ra_err := ioutil.ReadAll(resp.Body) + if ra_err != nil { + return nil, ra_err + } + var ret UploadResult + unmarshal_err := json.Unmarshal(resp_body, &ret) + if unmarshal_err != nil { + glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body)) + return nil, unmarshal_err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil +} |
