aboutsummaryrefslogtreecommitdiff
path: root/go/operation
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2016-06-02 18:09:14 -0700
committerChris Lu <chris.lu@gmail.com>2016-06-02 18:09:14 -0700
commit5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44 (patch)
tree2e4dd2ad0a618ab2b7cdebcdb9c503526c31e2e8 /go/operation
parentcaeffa3998adc060fa66c4cd77af971ff2d26c57 (diff)
downloadseaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.tar.xz
seaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.zip
directory structure change to work with glide
glide has its own requirements. My previous workaround caused me some code checkin errors. Need to fix this.
Diffstat (limited to 'go/operation')
-rw-r--r--go/operation/assign_file_id.go48
-rw-r--r--go/operation/chunked_file.go213
-rw-r--r--go/operation/compress.go59
-rw-r--r--go/operation/data_struts.go7
-rw-r--r--go/operation/delete_content.go117
-rw-r--r--go/operation/list_masters.go32
-rw-r--r--go/operation/lookup.go118
-rw-r--r--go/operation/lookup_vid_cache.go51
-rw-r--r--go/operation/lookup_vid_cache_test.go26
-rw-r--r--go/operation/submit.go194
-rw-r--r--go/operation/sync_volume.go54
-rw-r--r--go/operation/system_message.pb.go203
-rw-r--r--go/operation/system_message_test.go59
-rw-r--r--go/operation/upload_content.go96
14 files changed, 0 insertions, 1277 deletions
diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go
deleted file mode 100644
index fa436b651..000000000
--- a/go/operation/assign_file_id.go
+++ /dev/null
@@ -1,48 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "net/url"
- "strconv"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type AssignResult struct {
- Fid string `json:"fid,omitempty"`
- Url string `json:"url,omitempty"`
- PublicUrl string `json:"publicUrl,omitempty"`
- Count uint64 `json:"count,omitempty"`
- Error string `json:"error,omitempty"`
-}
-
-func Assign(server string, count uint64, replication string, collection string, ttl string) (*AssignResult, error) {
- values := make(url.Values)
- values.Add("count", strconv.FormatUint(count, 10))
- if replication != "" {
- values.Add("replication", replication)
- }
- if collection != "" {
- values.Add("collection", collection)
- }
- if ttl != "" {
- values.Add("ttl", ttl)
- }
- jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
- glog.V(2).Info("assign result :", string(jsonBlob))
- if err != nil {
- return nil, err
- }
- var ret AssignResult
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, fmt.Errorf("/dir/assign result JSON unmarshal error:%v, json:%s", err, string(jsonBlob))
- }
- if ret.Count <= 0 {
- return nil, errors.New(ret.Error)
- }
- return &ret, nil
-}
diff --git a/go/operation/chunked_file.go b/go/operation/chunked_file.go
deleted file mode 100644
index 786b8a989..000000000
--- a/go/operation/chunked_file.go
+++ /dev/null
@@ -1,213 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "sort"
-
- "sync"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-var (
- // when the remote server does not allow range requests (Accept-Ranges was not set)
- ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
- // ErrInvalidRange is returned by Read when trying to read past the end of the file
- ErrInvalidRange = errors.New("Invalid range")
-)
-
-type ChunkInfo struct {
- Fid string `json:"fid"`
- Offset int64 `json:"offset"`
- Size int64 `json:"size"`
-}
-
-type ChunkList []*ChunkInfo
-
-type ChunkManifest struct {
- Name string `json:"name,omitempty"`
- Mime string `json:"mime,omitempty"`
- Size int64 `json:"size,omitempty"`
- Chunks ChunkList `json:"chunks,omitempty"`
-}
-
-// seekable chunked file reader
-type ChunkedFileReader struct {
- Manifest *ChunkManifest
- Master string
- pos int64
- pr *io.PipeReader
- pw *io.PipeWriter
- mutex sync.Mutex
-}
-
-func (s ChunkList) Len() int { return len(s) }
-func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
-func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
-
-func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
- if isGzipped {
- var err error
- if buffer, err = UnGzipData(buffer); err != nil {
- return nil, err
- }
- }
- cm := ChunkManifest{}
- if e := json.Unmarshal(buffer, &cm); e != nil {
- return nil, e
- }
- sort.Sort(cm.Chunks)
- return &cm, nil
-}
-
-func (cm *ChunkManifest) Marshal() ([]byte, error) {
- return json.Marshal(cm)
-}
-
-func (cm *ChunkManifest) DeleteChunks(master string) error {
- deleteError := 0
- for _, ci := range cm.Chunks {
- if e := DeleteFile(master, ci.Fid, ""); e != nil {
- deleteError++
- glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master)
- }
- }
- if deleteError > 0 {
- return errors.New("Not all chunks deleted.")
- }
- return nil
-}
-
-func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) {
- req, err := http.NewRequest("GET", fileUrl, nil)
- if err != nil {
- return written, err
- }
- if offset > 0 {
- req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
- }
-
- resp, err := util.Do(req)
- if err != nil {
- return written, err
- }
- defer resp.Body.Close()
-
- switch resp.StatusCode {
- case http.StatusRequestedRangeNotSatisfiable:
- return written, ErrInvalidRange
- case http.StatusOK:
- if offset > 0 {
- return written, ErrRangeRequestsNotSupported
- }
- case http.StatusPartialContent:
- break
- default:
- return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
-
- }
- return io.Copy(w, resp.Body)
-}
-
-func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
- var err error
- switch whence {
- case 0:
- case 1:
- offset += cf.pos
- case 2:
- offset = cf.Manifest.Size - offset
- }
- if offset > cf.Manifest.Size {
- err = ErrInvalidRange
- }
- if cf.pos != offset {
- cf.Close()
- }
- cf.pos = offset
- return cf.pos, err
-}
-
-func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
- cm := cf.Manifest
- chunkIndex := -1
- chunkStartOffset := int64(0)
- for i, ci := range cm.Chunks {
- if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
- chunkIndex = i
- chunkStartOffset = cf.pos - ci.Offset
- break
- }
- }
- if chunkIndex < 0 {
- return n, ErrInvalidRange
- }
- for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
- ci := cm.Chunks[chunkIndex]
- // if we need read date from local volume server first?
- fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
- if lookupError != nil {
- return n, lookupError
- }
- if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil {
- return n, e
- } else {
- n += wn
- cf.pos += wn
- }
-
- chunkStartOffset = 0
- }
- return n, nil
-}
-
-func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
- cf.Seek(off, 0)
- return cf.Read(p)
-}
-
-func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
- return cf.getPipeReader().Read(p)
-}
-
-func (cf *ChunkedFileReader) Close() (e error) {
- cf.mutex.Lock()
- defer cf.mutex.Unlock()
- return cf.closePipe()
-}
-
-func (cf *ChunkedFileReader) closePipe() (e error) {
- if cf.pr != nil {
- if err := cf.pr.Close(); err != nil {
- e = err
- }
- }
- cf.pr = nil
- if cf.pw != nil {
- if err := cf.pw.Close(); err != nil {
- e = err
- }
- }
- cf.pw = nil
- return e
-}
-
-func (cf *ChunkedFileReader) getPipeReader() io.Reader {
- cf.mutex.Lock()
- defer cf.mutex.Unlock()
- if cf.pr != nil && cf.pw != nil {
- return cf.pr
- }
- cf.closePipe()
- cf.pr, cf.pw = io.Pipe()
- go func(pw *io.PipeWriter) {
- _, e := cf.WriteTo(pw)
- pw.CloseWithError(e)
- }(cf.pw)
- return cf.pr
-}
diff --git a/go/operation/compress.go b/go/operation/compress.go
deleted file mode 100644
index b1105ba4b..000000000
--- a/go/operation/compress.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package operation
-
-import (
- "bytes"
- "compress/flate"
- "compress/gzip"
- "io/ioutil"
- "strings"
-
- "github.com/chrislusf/seaweedfs/go/glog"
-)
-
-/*
-* Default more not to gzip since gzip can be done on client side.
- */
-func IsGzippable(ext, mtype string) bool {
- if strings.HasPrefix(mtype, "text/") {
- return true
- }
- switch ext {
- case ".zip", ".rar", ".gz", ".bz2", ".xz":
- return false
- case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json":
- return true
- }
- if strings.HasPrefix(mtype, "application/") {
- if strings.HasSuffix(mtype, "xml") {
- return true
- }
- if strings.HasSuffix(mtype, "script") {
- return true
- }
- }
- return false
-}
-
-func GzipData(input []byte) ([]byte, error) {
- buf := new(bytes.Buffer)
- w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
- if _, err := w.Write(input); err != nil {
- glog.V(2).Infoln("error compressing data:", err)
- return nil, err
- }
- if err := w.Close(); err != nil {
- glog.V(2).Infoln("error closing compressed data:", err)
- return nil, err
- }
- return buf.Bytes(), nil
-}
-func UnGzipData(input []byte) ([]byte, error) {
- buf := bytes.NewBuffer(input)
- r, _ := gzip.NewReader(buf)
- defer r.Close()
- output, err := ioutil.ReadAll(r)
- if err != nil {
- glog.V(2).Infoln("error uncompressing data:", err)
- }
- return output, err
-}
diff --git a/go/operation/data_struts.go b/go/operation/data_struts.go
deleted file mode 100644
index bfc53aa50..000000000
--- a/go/operation/data_struts.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package operation
-
-type JoinResult struct {
- VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
- SecretKey string `json:"secretKey,omitempty"`
- Error string `json:"error,omitempty"`
-}
diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go
deleted file mode 100644
index c808cc75a..000000000
--- a/go/operation/delete_content.go
+++ /dev/null
@@ -1,117 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "net/url"
- "strings"
- "sync"
-
- "net/http"
-
- "github.com/chrislusf/seaweedfs/go/security"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type DeleteResult struct {
- Fid string `json:"fid"`
- Size int `json:"size"`
- Status int `json:"status"`
- Error string `json:"error,omitempty"`
-}
-
-func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
- fileUrl, err := LookupFileId(master, fileId)
- if err != nil {
- return fmt.Errorf("Failed to lookup %s:%v", fileId, err)
- }
- err = util.Delete(fileUrl, jwt)
- if err != nil {
- return fmt.Errorf("Failed to delete %s:%v", fileUrl, err)
- }
- return nil
-}
-
-func ParseFileId(fid string) (vid string, key_cookie string, err error) {
- commaIndex := strings.Index(fid, ",")
- if commaIndex <= 0 {
- return "", "", errors.New("Wrong fid format.")
- }
- return fid[:commaIndex], fid[commaIndex+1:], nil
-}
-
-type DeleteFilesResult struct {
- Errors []string
- Results []DeleteResult
-}
-
-func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
- vid_to_fileIds := make(map[string][]string)
- ret := &DeleteFilesResult{}
- var vids []string
- for _, fileId := range fileIds {
- vid, _, err := ParseFileId(fileId)
- if err != nil {
- ret.Results = append(ret.Results, DeleteResult{
- Fid: vid,
- Status: http.StatusBadRequest,
- Error: err.Error()},
- )
- continue
- }
- if _, ok := vid_to_fileIds[vid]; !ok {
- vid_to_fileIds[vid] = make([]string, 0)
- vids = append(vids, vid)
- }
- vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId)
- }
-
- lookupResults, err := LookupVolumeIds(master, vids)
- if err != nil {
- return ret, err
- }
-
- server_to_fileIds := make(map[string][]string)
- for vid, result := range lookupResults {
- if result.Error != "" {
- ret.Errors = append(ret.Errors, result.Error)
- continue
- }
- for _, location := range result.Locations {
- if _, ok := server_to_fileIds[location.Url]; !ok {
- server_to_fileIds[location.Url] = make([]string, 0)
- }
- server_to_fileIds[location.Url] = append(
- server_to_fileIds[location.Url], vid_to_fileIds[vid]...)
- }
- }
-
- var wg sync.WaitGroup
-
- for server, fidList := range server_to_fileIds {
- wg.Add(1)
- go func(server string, fidList []string) {
- defer wg.Done()
- values := make(url.Values)
- for _, fid := range fidList {
- values.Add("fid", fid)
- }
- jsonBlob, err := util.Post("http://"+server+"/delete", values)
- if err != nil {
- ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
- return
- }
- var result []DeleteResult
- err = json.Unmarshal(jsonBlob, &result)
- if err != nil {
- ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
- return
- }
- ret.Results = append(ret.Results, result...)
- }(server, fidList)
- }
- wg.Wait()
-
- return ret, nil
-}
diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go
deleted file mode 100644
index bda6f3c65..000000000
--- a/go/operation/list_masters.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package operation
-
-import (
- "encoding/json"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type ClusterStatusResult struct {
- IsLeader bool `json:"IsLeader,omitempty"`
- Leader string `json:"Leader,omitempty"`
- Peers []string `json:"Peers,omitempty"`
-}
-
-func ListMasters(server string) ([]string, error) {
- jsonBlob, err := util.Get("http://" + server + "/cluster/status")
- glog.V(2).Info("list masters result :", string(jsonBlob))
- if err != nil {
- return nil, err
- }
- var ret ClusterStatusResult
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, err
- }
- masters := ret.Peers
- if ret.IsLeader {
- masters = append(masters, ret.Leader)
- }
- return masters, nil
-}
diff --git a/go/operation/lookup.go b/go/operation/lookup.go
deleted file mode 100644
index f77d1ec9b..000000000
--- a/go/operation/lookup.go
+++ /dev/null
@@ -1,118 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "math/rand"
- "net/url"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type Location struct {
- Url string `json:"url,omitempty"`
- PublicUrl string `json:"publicUrl,omitempty"`
-}
-type LookupResult struct {
- VolumeId string `json:"volumeId,omitempty"`
- Locations []Location `json:"locations,omitempty"`
- Error string `json:"error,omitempty"`
-}
-
-func (lr *LookupResult) String() string {
- return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error)
-}
-
-var (
- vc VidCache // caching of volume locations, re-check if after 10 minutes
-)
-
-func Lookup(server string, vid string) (ret *LookupResult, err error) {
- locations, cache_err := vc.Get(vid)
- if cache_err != nil {
- if ret, err = do_lookup(server, vid); err == nil {
- vc.Set(vid, ret.Locations, 10*time.Minute)
- }
- } else {
- ret = &LookupResult{VolumeId: vid, Locations: locations}
- }
- return
-}
-
-func do_lookup(server string, vid string) (*LookupResult, error) {
- values := make(url.Values)
- values.Add("volumeId", vid)
- jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
- if err != nil {
- return nil, err
- }
- var ret LookupResult
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, err
- }
- if ret.Error != "" {
- return nil, errors.New(ret.Error)
- }
- return &ret, nil
-}
-
-func LookupFileId(server string, fileId string) (fullUrl string, err error) {
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return "", errors.New("Invalid fileId " + fileId)
- }
- lookup, lookupError := Lookup(server, parts[0])
- if lookupError != nil {
- return "", lookupError
- }
- if len(lookup.Locations) == 0 {
- return "", errors.New("File Not Found")
- }
- return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil
-}
-
-// LookupVolumeIds find volume locations by cache and actual lookup
-func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
- ret := make(map[string]LookupResult)
- var unknown_vids []string
-
- //check vid cache first
- for _, vid := range vids {
- locations, cache_err := vc.Get(vid)
- if cache_err == nil {
- ret[vid] = LookupResult{VolumeId: vid, Locations: locations}
- } else {
- unknown_vids = append(unknown_vids, vid)
- }
- }
- //return success if all volume ids are known
- if len(unknown_vids) == 0 {
- return ret, nil
- }
-
- //only query unknown_vids
- values := make(url.Values)
- for _, vid := range unknown_vids {
- values.Add("volumeId", vid)
- }
- jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values)
- if err != nil {
- return nil, err
- }
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, errors.New(err.Error() + " " + string(jsonBlob))
- }
-
- //set newly checked vids to cache
- for _, vid := range unknown_vids {
- locations := ret[vid].Locations
- vc.Set(vid, locations, 10*time.Minute)
- }
-
- return ret, nil
-}
diff --git a/go/operation/lookup_vid_cache.go b/go/operation/lookup_vid_cache.go
deleted file mode 100644
index ac4240102..000000000
--- a/go/operation/lookup_vid_cache.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package operation
-
-import (
- "errors"
- "strconv"
- "time"
-
- "github.com/chrislusf/seaweedfs/go/glog"
-)
-
-type VidInfo struct {
- Locations []Location
- NextRefreshTime time.Time
-}
-type VidCache struct {
- cache []VidInfo
-}
-
-func (vc *VidCache) Get(vid string) ([]Location, error) {
- id, err := strconv.Atoi(vid)
- if err != nil {
- glog.V(1).Infof("Unknown volume id %s", vid)
- return nil, err
- }
- if 0 < id && id <= len(vc.cache) {
- if vc.cache[id-1].Locations == nil {
- return nil, errors.New("Not Set")
- }
- if vc.cache[id-1].NextRefreshTime.Before(time.Now()) {
- return nil, errors.New("Expired")
- }
- return vc.cache[id-1].Locations, nil
- }
- return nil, errors.New("Not Found")
-}
-func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) {
- id, err := strconv.Atoi(vid)
- if err != nil {
- glog.V(1).Infof("Unknown volume id %s", vid)
- return
- }
- if id > len(vc.cache) {
- for i := id - len(vc.cache); i > 0; i-- {
- vc.cache = append(vc.cache, VidInfo{})
- }
- }
- if id > 0 {
- vc.cache[id-1].Locations = locations
- vc.cache[id-1].NextRefreshTime = time.Now().Add(duration)
- }
-}
diff --git a/go/operation/lookup_vid_cache_test.go b/go/operation/lookup_vid_cache_test.go
deleted file mode 100644
index 9c9e2affb..000000000
--- a/go/operation/lookup_vid_cache_test.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package operation
-
-import (
- "fmt"
- "testing"
- "time"
-)
-
-func TestCaching(t *testing.T) {
- var (
- vc VidCache
- )
- var locations []Location
- locations = append(locations, Location{Url: "a.com:8080"})
- vc.Set("123", locations, time.Second)
- ret, _ := vc.Get("123")
- if ret == nil {
- t.Fatal("Not found vid 123")
- }
- fmt.Printf("vid 123 locations = %v\n", ret)
- time.Sleep(2 * time.Second)
- ret, _ = vc.Get("123")
- if ret != nil {
- t.Fatal("Not found vid 123")
- }
-}
diff --git a/go/operation/submit.go b/go/operation/submit.go
deleted file mode 100644
index 18484680a..000000000
--- a/go/operation/submit.go
+++ /dev/null
@@ -1,194 +0,0 @@
-package operation
-
-import (
- "bytes"
- "io"
- "mime"
- "net/url"
- "os"
- "path"
- "strconv"
- "strings"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/security"
-)
-
-type FilePart struct {
- Reader io.Reader
- FileName string
- FileSize int64
- IsGzipped bool
- MimeType string
- ModTime int64 //in seconds
- Replication string
- Collection string
- Ttl string
- Server string //this comes from assign result
- Fid string //this comes from assign result, but customizable
-}
-
-type SubmitResult struct {
- FileName string `json:"fileName,omitempty"`
- FileUrl string `json:"fileUrl,omitempty"`
- Fid string `json:"fid,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
-}
-
-func SubmitFiles(master string, files []FilePart,
- replication string, collection string, ttl string, maxMB int,
- secret security.Secret,
-) ([]SubmitResult, error) {
- results := make([]SubmitResult, len(files))
- for index, file := range files {
- results[index].FileName = file.FileName
- }
- ret, err := Assign(master, uint64(len(files)), replication, collection, ttl)
- if err != nil {
- for index, _ := range files {
- results[index].Error = err.Error()
- }
- return results, err
- }
- for index, file := range files {
- file.Fid = ret.Fid
- if index > 0 {
- file.Fid = file.Fid + "_" + strconv.Itoa(index)
- }
- file.Server = ret.Url
- file.Replication = replication
- file.Collection = collection
- results[index].Size, err = file.Upload(maxMB, master, secret)
- if err != nil {
- results[index].Error = err.Error()
- }
- results[index].Fid = file.Fid
- results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
- }
- return results, nil
-}
-
-func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
- ret = make([]FilePart, len(fullPathFilenames))
- for index, file := range fullPathFilenames {
- if ret[index], err = newFilePart(file); err != nil {
- return
- }
- }
- return
-}
-func newFilePart(fullPathFilename string) (ret FilePart, err error) {
- fh, openErr := os.Open(fullPathFilename)
- if openErr != nil {
- glog.V(0).Info("Failed to open file: ", fullPathFilename)
- return ret, openErr
- }
- ret.Reader = fh
-
- if fi, fiErr := fh.Stat(); fiErr != nil {
- glog.V(0).Info("Failed to stat file:", fullPathFilename)
- return ret, fiErr
- } else {
- ret.ModTime = fi.ModTime().UTC().Unix()
- ret.FileSize = fi.Size()
- }
- ext := strings.ToLower(path.Ext(fullPathFilename))
- ret.IsGzipped = ext == ".gz"
- if ret.IsGzipped {
- ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
- }
- ret.FileName = fullPathFilename
- if ext != "" {
- ret.MimeType = mime.TypeByExtension(ext)
- }
-
- return ret, nil
-}
-
-func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
- jwt := security.GenJwt(secret, fi.Fid)
- fileUrl := "http://" + fi.Server + "/" + fi.Fid
- if fi.ModTime != 0 {
- fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
- }
- if closer, ok := fi.Reader.(io.Closer); ok {
- defer closer.Close()
- }
- baseName := path.Base(fi.FileName)
- if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
- chunkSize := int64(maxMB * 1024 * 1024)
- chunks := fi.FileSize/chunkSize + 1
- cm := ChunkManifest{
- Name: baseName,
- Size: fi.FileSize,
- Mime: fi.MimeType,
- Chunks: make([]*ChunkInfo, 0, chunks),
- }
-
- for i := int64(0); i < chunks; i++ {
- id, count, e := upload_one_chunk(
- baseName+"-"+strconv.FormatInt(i+1, 10),
- io.LimitReader(fi.Reader, chunkSize),
- master, fi.Replication, fi.Collection, fi.Ttl,
- jwt)
- if e != nil {
- // delete all uploaded chunks
- cm.DeleteChunks(master)
- return 0, e
- }
- cm.Chunks = append(cm.Chunks,
- &ChunkInfo{
- Offset: i * chunkSize,
- Size: int64(count),
- Fid: id,
- },
- )
- retSize += count
- }
- err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
- if err != nil {
- // delete all uploaded chunks
- cm.DeleteChunks(master)
- }
- } else {
- ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
- if e != nil {
- return 0, e
- }
- return ret.Size, e
- }
- return
-}
-
-func upload_one_chunk(filename string, reader io.Reader, master,
- replication string, collection string, ttl string, jwt security.EncodedJwt,
-) (fid string, size uint32, e error) {
- ret, err := Assign(master, 1, replication, collection, ttl)
- if err != nil {
- return "", 0, err
- }
- fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
- glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
- "application/octet-stream", jwt)
- if uploadError != nil {
- return fid, 0, uploadError
- }
- return fid, uploadResult.Size, nil
-}
-
-func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
- buf, e := manifest.Marshal()
- if e != nil {
- return e
- }
- bufReader := bytes.NewReader(buf)
- glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
- u, _ := url.Parse(fileUrl)
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
- _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
- return e
-}
diff --git a/go/operation/sync_volume.go b/go/operation/sync_volume.go
deleted file mode 100644
index 54944a64e..000000000
--- a/go/operation/sync_volume.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "fmt"
- "net/url"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type SyncVolumeResponse struct {
- Replication string `json:"Replication,omitempty"`
- Ttl string `json:"Ttl,omitempty"`
- TailOffset uint64 `json:"TailOffset,omitempty"`
- CompactRevision uint16 `json:"CompactRevision,omitempty"`
- IdxFileSize uint64 `json:"IdxFileSize,omitempty"`
- Error string `json:"error,omitempty"`
-}
-
-func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) {
- values := make(url.Values)
- values.Add("volume", vid)
- jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values)
- glog.V(2).Info("sync volume result :", string(jsonBlob))
- if err != nil {
- return nil, err
- }
- var ret SyncVolumeResponse
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, err
- }
- if ret.Error != "" {
- return nil, fmt.Errorf("Volume %s get sync status error: %s", vid, ret.Error)
- }
- return &ret, nil
-}
-
-func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error {
- values := make(url.Values)
- values.Add("volume", vid)
- line := make([]byte, 16)
- err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) {
- key := util.BytesToUint64(bytes[:8])
- offset := util.BytesToUint32(bytes[8:12])
- size := util.BytesToUint32(bytes[12:16])
- eachEntryFn(key, offset, size)
- })
- if err != nil {
- return err
- }
- return nil
-}
diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go
deleted file mode 100644
index 742a1ca4e..000000000
--- a/go/operation/system_message.pb.go
+++ /dev/null
@@ -1,203 +0,0 @@
-// Code generated by protoc-gen-go.
-// source: system_message.proto
-// DO NOT EDIT!
-
-/*
-Package operation is a generated protocol buffer package.
-
-It is generated from these files:
- system_message.proto
-
-It has these top-level messages:
- VolumeInformationMessage
- JoinMessage
-*/
-package operation
-
-import proto "github.com/golang/protobuf/proto"
-import math "math"
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto.Marshal
-var _ = math.Inf
-
-type VolumeInformationMessage struct {
- Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"`
- Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"`
- Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
- FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"`
- DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
- DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
- ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
- ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
- Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
- Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
- XXX_unrecognized []byte `json:"-"`
-}
-
-func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
-func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
-func (*VolumeInformationMessage) ProtoMessage() {}
-
-const Default_VolumeInformationMessage_Version uint32 = 2
-
-func (m *VolumeInformationMessage) GetId() uint32 {
- if m != nil && m.Id != nil {
- return *m.Id
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetSize() uint64 {
- if m != nil && m.Size != nil {
- return *m.Size
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetCollection() string {
- if m != nil && m.Collection != nil {
- return *m.Collection
- }
- return ""
-}
-
-func (m *VolumeInformationMessage) GetFileCount() uint64 {
- if m != nil && m.FileCount != nil {
- return *m.FileCount
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
- if m != nil && m.DeleteCount != nil {
- return *m.DeleteCount
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
- if m != nil && m.DeletedByteCount != nil {
- return *m.DeletedByteCount
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetReadOnly() bool {
- if m != nil && m.ReadOnly != nil {
- return *m.ReadOnly
- }
- return false
-}
-
-func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
- if m != nil && m.ReplicaPlacement != nil {
- return *m.ReplicaPlacement
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetVersion() uint32 {
- if m != nil && m.Version != nil {
- return *m.Version
- }
- return Default_VolumeInformationMessage_Version
-}
-
-func (m *VolumeInformationMessage) GetTtl() uint32 {
- if m != nil && m.Ttl != nil {
- return *m.Ttl
- }
- return 0
-}
-
-type JoinMessage struct {
- IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
- Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
- Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"`
- PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"`
- MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"`
- MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"`
- DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
- Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
- Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
- AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"`
- XXX_unrecognized []byte `json:"-"`
-}
-
-func (m *JoinMessage) Reset() { *m = JoinMessage{} }
-func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
-func (*JoinMessage) ProtoMessage() {}
-
-func (m *JoinMessage) GetIsInit() bool {
- if m != nil && m.IsInit != nil {
- return *m.IsInit
- }
- return false
-}
-
-func (m *JoinMessage) GetIp() string {
- if m != nil && m.Ip != nil {
- return *m.Ip
- }
- return ""
-}
-
-func (m *JoinMessage) GetPort() uint32 {
- if m != nil && m.Port != nil {
- return *m.Port
- }
- return 0
-}
-
-func (m *JoinMessage) GetPublicUrl() string {
- if m != nil && m.PublicUrl != nil {
- return *m.PublicUrl
- }
- return ""
-}
-
-func (m *JoinMessage) GetMaxVolumeCount() uint32 {
- if m != nil && m.MaxVolumeCount != nil {
- return *m.MaxVolumeCount
- }
- return 0
-}
-
-func (m *JoinMessage) GetMaxFileKey() uint64 {
- if m != nil && m.MaxFileKey != nil {
- return *m.MaxFileKey
- }
- return 0
-}
-
-func (m *JoinMessage) GetDataCenter() string {
- if m != nil && m.DataCenter != nil {
- return *m.DataCenter
- }
- return ""
-}
-
-func (m *JoinMessage) GetRack() string {
- if m != nil && m.Rack != nil {
- return *m.Rack
- }
- return ""
-}
-
-func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
- if m != nil {
- return m.Volumes
- }
- return nil
-}
-
-func (m *JoinMessage) GetAdminPort() uint32 {
- if m != nil && m.AdminPort != nil {
- return *m.AdminPort
- }
- return 0
-}
-
-func init() {
-}
diff --git a/go/operation/system_message_test.go b/go/operation/system_message_test.go
deleted file mode 100644
index d18ca49a4..000000000
--- a/go/operation/system_message_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "log"
- "testing"
-
- "github.com/golang/protobuf/proto"
-)
-
-func TestSerialDeserial(t *testing.T) {
- volumeMessage := &VolumeInformationMessage{
- Id: proto.Uint32(12),
- Size: proto.Uint64(2341234),
- Collection: proto.String("benchmark"),
- FileCount: proto.Uint64(2341234),
- DeleteCount: proto.Uint64(234),
- DeletedByteCount: proto.Uint64(21234),
- ReadOnly: proto.Bool(false),
- ReplicaPlacement: proto.Uint32(210),
- Version: proto.Uint32(2),
- }
- var volumeMessages []*VolumeInformationMessage
- volumeMessages = append(volumeMessages, volumeMessage)
-
- joinMessage := &JoinMessage{
- IsInit: proto.Bool(true),
- Ip: proto.String("127.0.3.12"),
- Port: proto.Uint32(34546),
- PublicUrl: proto.String("localhost:2342"),
- MaxVolumeCount: proto.Uint32(210),
- MaxFileKey: proto.Uint64(324234423),
- DataCenter: proto.String("dc1"),
- Rack: proto.String("rack2"),
- Volumes: volumeMessages,
- }
-
- data, err := proto.Marshal(joinMessage)
- if err != nil {
- log.Fatal("marshaling error: ", err)
- }
- newMessage := &JoinMessage{}
- err = proto.Unmarshal(data, newMessage)
- if err != nil {
- log.Fatal("unmarshaling error: ", err)
- }
- log.Println("The pb data size is", len(data))
-
- jsonData, jsonError := json.Marshal(joinMessage)
- if jsonError != nil {
- log.Fatal("json marshaling error: ", jsonError)
- }
- log.Println("The json data size is", len(jsonData), string(jsonData))
-
- // Now test and newTest contain the same data.
- if *joinMessage.PublicUrl != *newMessage.PublicUrl {
- log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl)
- }
-}
diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go
deleted file mode 100644
index c3fd5f4b1..000000000
--- a/go/operation/upload_content.go
+++ /dev/null
@@ -1,96 +0,0 @@
-package operation
-
-import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "mime"
- "mime/multipart"
- "net/http"
- "net/textproto"
- "path/filepath"
- "strings"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/security"
-)
-
-type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
-}
-
-var (
- client *http.Client
-)
-
-func init() {
- client = &http.Client{Transport: &http.Transport{
- MaxIdleConnsPerHost: 1024,
- }}
-}
-
-var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
-
-func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
- return upload_content(uploadUrl, func(w io.Writer) (err error) {
- _, err = io.Copy(w, reader)
- return
- }, filename, isGzipped, mtype, jwt)
-}
-func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
- body_buf := bytes.NewBufferString("")
- body_writer := multipart.NewWriter(body_buf)
- h := make(textproto.MIMEHeader)
- h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
- if mtype == "" {
- mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename)))
- }
- if mtype != "" {
- h.Set("Content-Type", mtype)
- }
- if isGzipped {
- h.Set("Content-Encoding", "gzip")
- }
- if jwt != "" {
- h.Set("Authorization", "BEARER "+string(jwt))
- }
- file_writer, cp_err := body_writer.CreatePart(h)
- if cp_err != nil {
- glog.V(0).Infoln("error creating form file", cp_err.Error())
- return nil, cp_err
- }
- if err := fillBufferFunction(file_writer); err != nil {
- glog.V(0).Infoln("error copying data", err)
- return nil, err
- }
- content_type := body_writer.FormDataContentType()
- if err := body_writer.Close(); err != nil {
- glog.V(0).Infoln("error closing body", err)
- return nil, err
- }
- resp, post_err := client.Post(uploadUrl, content_type, body_buf)
- if post_err != nil {
- glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
- return nil, post_err
- }
- defer resp.Body.Close()
- resp_body, ra_err := ioutil.ReadAll(resp.Body)
- if ra_err != nil {
- return nil, ra_err
- }
- var ret UploadResult
- unmarshal_err := json.Unmarshal(resp_body, &ret)
- if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body))
- return nil, unmarshal_err
- }
- if ret.Error != "" {
- return nil, errors.New(ret.Error)
- }
- return &ret, nil
-}