diff options
Diffstat (limited to 'go')
98 files changed, 1178 insertions, 589 deletions
diff --git a/go/filer/cassandra_store/cassandra_store.go b/go/filer/cassandra_store/cassandra_store.go new file mode 100644 index 000000000..83face686 --- /dev/null +++ b/go/filer/cassandra_store/cassandra_store.go @@ -0,0 +1,87 @@ +package cassandra_store + +import ( + "fmt" + + "github.com/chrislusf/weed-fs/go/glog" + + "github.com/gocql/gocql" +) + +/* + +Basically you need a table just like this: + +CREATE TABLE seaweed_files ( + path varchar, + fids list<varchar>, + PRIMARY KEY (path) +); + +Need to match flat_namespace.FlatNamespaceStore interface + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) + +*/ +type CassandraStore struct { + cluster *gocql.ClusterConfig + session *gocql.Session +} + +func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) { + c = &CassandraStore{} + c.cluster = gocql.NewCluster(hosts...) + c.cluster.Keyspace = keyspace + c.cluster.Consistency = gocql.Quorum + c.session, err = c.cluster.CreateSession() + if err != nil { + glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) + } + return +} + +func (c *CassandraStore) Put(fullFileName string, fid string) (err error) { + var input []string + input = append(input, fid) + if err := c.session.Query( + `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`, + fullFileName, input).Exec(); err != nil { + glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err) + return err + } + return nil +} +func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { + var output []string + if err := c.session.Query( + `select fids FROM seaweed_files WHERE path = ? LIMIT 1`, + fullFileName).Consistency(gocql.One).Scan(&output); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) + } + } + if len(output) == 0 { + return "", fmt.Errorf("No file id found for %s", fullFileName) + } + return output[0], nil +} + +// Currently the fid is not returned +func (c *CassandraStore) Delete(fullFileName string) (fid string, err error) { + if err := c.session.Query( + `DELETE FROM seaweed_files WHERE path = ?`, + fullFileName).Exec(); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err) + } + return "", err + } + return "", nil +} + +func (c *CassandraStore) Close() { + if c.session != nil { + c.session.Close() + } +} diff --git a/go/filer/cassandra_store/schema.cql b/go/filer/cassandra_store/schema.cql new file mode 100644 index 000000000..d6f2bb093 --- /dev/null +++ b/go/filer/cassandra_store/schema.cql @@ -0,0 +1,22 @@ +/* + +Here is the CQL to create the table.CassandraStore + +Optionally you can adjust the keyspace name and replication settings. + +For production server, very likely you want to set replication_factor to 3 + +*/ + +create keyspace seaweed WITH replication = { + 'class':'SimpleStrategy', + 'replication_factor':1 +}; + +use seaweed; + +CREATE TABLE seaweed_files ( + path varchar, + fids list<varchar>, + PRIMARY KEY (path) +); diff --git a/go/filer/client_operations.go b/go/filer/client_operations.go index 0b006289f..b38368735 100644 --- a/go/filer/client_operations.go +++ b/go/filer/client_operations.go @@ -1,12 +1,12 @@ package filer -import () - import ( - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "fmt" + + "github.com/chrislusf/weed-fs/go/util" + "net/url" ) diff --git a/go/filer/directory.go b/go/filer/directory.go deleted file mode 100644 index 956a2f504..000000000 --- a/go/filer/directory.go +++ /dev/null @@ -1,20 +0,0 @@ -package filer - -import () - -type DirectoryId int32 - -type DirectoryEntry struct { - Name string //dir name without path - Id DirectoryId -} - -type DirectoryManager interface { - FindDirectory(dirPath string) (DirectoryId, error) - ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) - MakeDirectory(currentDirPath string, dirName string) (DirectoryId, error) - MoveUnderDirectory(oldDirPath string, newParentDirPath string) error - DeleteDirectory(dirPath string) error - //functions used by FUSE - FindDirectoryById(DirectoryId, error) -} diff --git a/go/filer/design.txt b/go/filer/embedded_filer/design.txt index 45fec8fbe..45fec8fbe 100644 --- a/go/filer/design.txt +++ b/go/filer/embedded_filer/design.txt diff --git a/go/filer/embedded_filer/directory.go b/go/filer/embedded_filer/directory.go new file mode 100644 index 000000000..8ab3e4aff --- /dev/null +++ b/go/filer/embedded_filer/directory.go @@ -0,0 +1,15 @@ +package embedded_filer + +import ( + "github.com/chrislusf/weed-fs/go/filer" +) + +type DirectoryManager interface { + FindDirectory(dirPath string) (filer.DirectoryId, error) + ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) + MakeDirectory(currentDirPath string, dirName string) (filer.DirectoryId, error) + MoveUnderDirectory(oldDirPath string, newParentDirPath string) error + DeleteDirectory(dirPath string) error + //functions used by FUSE + FindDirectoryById(filer.DirectoryId, error) +} diff --git a/go/filer/directory_in_map.go b/go/filer/embedded_filer/directory_in_map.go index 1d88a78be..a1d0f43bd 100644 --- a/go/filer/directory_in_map.go +++ b/go/filer/embedded_filer/directory_in_map.go @@ -1,8 +1,7 @@ -package filer +package embedded_filer import ( "bufio" - "github.com/chrislusf/weed-fs/go/util" "fmt" "io" "os" @@ -10,6 +9,9 @@ import ( "strconv" "strings" "sync" + + "github.com/chrislusf/weed-fs/go/filer" + "github.com/chrislusf/weed-fs/go/util" ) var writeLock sync.Mutex //serialize changes to dir.log @@ -18,12 +20,12 @@ type DirectoryEntryInMap struct { Name string Parent *DirectoryEntryInMap SubDirectories map[string]*DirectoryEntryInMap - Id DirectoryId + Id filer.DirectoryId } type DirectoryManagerInMap struct { Root *DirectoryEntryInMap - max DirectoryId + max filer.DirectoryId logFile *os.File isLoading bool } @@ -82,7 +84,7 @@ func (dm *DirectoryManagerInMap) processEachLine(line string) error { if pe != nil { return pe } - if e := dm.loadDirectory(parts[1], DirectoryId(v)); e != nil { + if e := dm.loadDirectory(parts[1], filer.DirectoryId(v)); e != nil { return e } case "mov": @@ -141,7 +143,7 @@ func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryI } return dir, nil } -func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (DirectoryId, error) { +func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (filer.DirectoryId, error) { d, e := dm.findDirectory(dirPath) if e == nil { return d.Id, nil @@ -149,7 +151,7 @@ func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (DirectoryId, err return dm.Root.Id, e } -func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId) error { +func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId filer.DirectoryId) error { dirPath = filepath.Clean(dirPath) if dirPath == "/" { return nil @@ -200,7 +202,7 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn return dir, created } -func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, error) { +func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (filer.DirectoryId, error) { dir, _ := dm.makeDirectory(dirPath) return dir.Id, nil } @@ -227,13 +229,13 @@ func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParent return nil } -func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []DirectoryEntry, err error) { +func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) { d, e := dm.findDirectory(dirPath) if e != nil { return dirNames, e } for k, v := range d.SubDirectories { - dirNames = append(dirNames, DirectoryEntry{Name: k, Id: v.Id}) + dirNames = append(dirNames, filer.DirectoryEntry{Name: k, Id: v.Id}) } return dirNames, nil } diff --git a/go/filer/directory_test.go b/go/filer/embedded_filer/directory_test.go index ea4903f03..cb0090bf2 100644 --- a/go/filer/directory_test.go +++ b/go/filer/embedded_filer/directory_test.go @@ -1,4 +1,4 @@ -package filer +package embedded_filer import ( "os" diff --git a/go/filer/filer_embedded.go b/go/filer/embedded_filer/filer_embedded.go index 3d3dac941..4321bb4e6 100644 --- a/go/filer/filer_embedded.go +++ b/go/filer/embedded_filer/filer_embedded.go @@ -1,11 +1,13 @@ -package filer +package embedded_filer import ( - "github.com/chrislusf/weed-fs/go/operation" "errors" "fmt" "path/filepath" "strings" + + "github.com/chrislusf/weed-fs/go/filer" + "github.com/chrislusf/weed-fs/go/operation" ) type FilerEmbedded struct { @@ -47,13 +49,13 @@ func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) { } return filer.files.FindFile(dirId, file) } -func (filer *FilerEmbedded) FindDirectory(dirPath string) (dirId DirectoryId, err error) { +func (filer *FilerEmbedded) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { return filer.directories.FindDirectory(dirPath) } -func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) { +func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { return filer.directories.ListDirectories(dirPath) } -func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) { +func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { dirId, e := filer.directories.FindDirectory(dirPath) if e != nil { return nil, e diff --git a/go/filer/files_in_leveldb.go b/go/filer/embedded_filer/files_in_leveldb.go index 41fbc74bd..c4fab734f 100644 --- a/go/filer/files_in_leveldb.go +++ b/go/filer/embedded_filer/files_in_leveldb.go @@ -1,7 +1,9 @@ -package filer +package embedded_filer import ( "bytes" + + "github.com/chrislusf/weed-fs/go/filer" "github.com/chrislusf/weed-fs/go/glog" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" @@ -26,7 +28,7 @@ func NewFileListInLevelDb(dir string) (fl *FileListInLevelDb, err error) { return } -func genKey(dirId DirectoryId, fileName string) []byte { +func genKey(dirId filer.DirectoryId, fileName string) []byte { ret := make([]byte, 0, 4+len(fileName)) for i := 3; i >= 0; i-- { ret = append(ret, byte(dirId>>(uint(i)*8))) @@ -35,25 +37,25 @@ func genKey(dirId DirectoryId, fileName string) []byte { return ret } -func (fl *FileListInLevelDb) CreateFile(dirId DirectoryId, fileName string, fid string) (err error) { +func (fl *FileListInLevelDb) CreateFile(dirId filer.DirectoryId, fileName string, fid string) (err error) { glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid) return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil) } -func (fl *FileListInLevelDb) DeleteFile(dirId DirectoryId, fileName string) (fid string, err error) { +func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { if fid, err = fl.FindFile(dirId, fileName); err != nil { return } err = fl.db.Delete(genKey(dirId, fileName), nil) return fid, err } -func (fl *FileListInLevelDb) FindFile(dirId DirectoryId, fileName string) (fid string, err error) { +func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { data, e := fl.db.Get(genKey(dirId, fileName), nil) if e != nil { return "", e } return string(data), nil } -func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, limit int) (files []FileEntry) { +func (fl *FileListInLevelDb) ListFiles(dirId filer.DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) { glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit) dirKey := genKey(dirId, "") iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil) @@ -73,7 +75,7 @@ func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, l break } } - files = append(files, FileEntry{Name: fileName, Id: FileId(string(iter.Value()))}) + files = append(files, filer.FileEntry{Name: fileName, Id: filer.FileId(string(iter.Value()))}) } iter.Release() return diff --git a/go/filer/filer.go b/go/filer/filer.go index de877fc1f..5a2584c4a 100644 --- a/go/filer/filer.go +++ b/go/filer/filer.go @@ -1,7 +1,5 @@ package filer -import () - type FileId string //file id on weedfs type FileEntry struct { @@ -9,13 +7,22 @@ type FileEntry struct { Id FileId `json:"fid,omitempty"` } +type DirectoryId int32 + +type DirectoryEntry struct { + Name string //dir name without path + Id DirectoryId +} + type Filer interface { - CreateFile(filePath string, fid string) (err error) - FindFile(filePath string) (fid string, err error) + CreateFile(fullFileName string, fid string) (err error) + FindFile(fullFileName string) (fid string, err error) + DeleteFile(fullFileName string) (fid string, err error) + + //Optional functions. embedded filer support these FindDirectory(dirPath string) (dirId DirectoryId, err error) ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) DeleteDirectory(dirPath string, recursive bool) (err error) - DeleteFile(filePath string) (fid string, err error) Move(fromPath string, toPath string) (err error) } diff --git a/go/filer/flat_namespace/flat_namespace_filer.go b/go/filer/flat_namespace/flat_namespace_filer.go new file mode 100644 index 000000000..021b809d6 --- /dev/null +++ b/go/filer/flat_namespace/flat_namespace_filer.go @@ -0,0 +1,50 @@ +package flat_namespace + +import ( + "errors" + + "github.com/chrislusf/weed-fs/go/filer" +) + +type FlatNamesapceFiler struct { + master string + store FlatNamespaceStore +} + +var ( + NotImplemented = errors.New("Not Implemented for flat namespace meta data store!") +) + +func NewFlatNamesapceFiler(master string, store FlatNamespaceStore) *FlatNamesapceFiler { + return &FlatNamesapceFiler{ + master: master, + store: store, + } +} + +func (filer *FlatNamesapceFiler) CreateFile(fullFileName string, fid string) (err error) { + return filer.store.Put(fullFileName, fid) +} +func (filer *FlatNamesapceFiler) FindFile(fullFileName string) (fid string, err error) { + return filer.store.Get(fullFileName) +} +func (filer *FlatNamesapceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return 0, NotImplemented +} +func (filer *FlatNamesapceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return nil, NotImplemented +} +func (filer *FlatNamesapceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return nil, NotImplemented +} +func (filer *FlatNamesapceFiler) DeleteDirectory(dirPath string, recursive bool) (err error) { + return NotImplemented +} + +func (filer *FlatNamesapceFiler) DeleteFile(fullFileName string) (fid string, err error) { + return filer.store.Delete(fullFileName) +} + +func (filer *FlatNamesapceFiler) Move(fromPath string, toPath string) error { + return NotImplemented +} diff --git a/go/filer/flat_namespace/flat_namespace_store.go b/go/filer/flat_namespace/flat_namespace_store.go new file mode 100644 index 000000000..832b70e40 --- /dev/null +++ b/go/filer/flat_namespace/flat_namespace_store.go @@ -0,0 +1,9 @@ +package flat_namespace + +import () + +type FlatNamespaceStore interface { + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) +} diff --git a/go/filer/redis_store/redis_store.go b/go/filer/redis_store/redis_store.go new file mode 100644 index 000000000..8fe1c7ca5 --- /dev/null +++ b/go/filer/redis_store/redis_store.go @@ -0,0 +1,48 @@ +package redis_store + +import ( + redis "gopkg.in/redis.v2" +) + +type RedisStore struct { + Client *redis.Client +} + +func NewRedisStore(hostPort string, database int) *RedisStore { + client := redis.NewTCPClient(&redis.Options{ + Addr: hostPort, + Password: "", // no password set + DB: int64(database), + }) + return &RedisStore{Client: client} +} + +func (s *RedisStore) Get(fullFileName string) (fid string, err error) { + fid, err = s.Client.Get(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return fid, err +} +func (s *RedisStore) Put(fullFileName string, fid string) (err error) { + _, err = s.Client.Set(fullFileName, fid).Result() + if err == redis.Nil { + err = nil + } + return err +} + +// Currently the fid is not returned +func (s *RedisStore) Delete(fullFileName string) (fid string, err error) { + _, err = s.Client.Del(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return "", err +} + +func (c *RedisStore) Close() { + if c.Client != nil { + c.Client.Close() + } +} diff --git a/go/glog/convenient_api.go b/go/glog/convenient_api.go index 3c378083f..cb43d60e2 100644 --- a/go/glog/convenient_api.go +++ b/go/glog/convenient_api.go @@ -1,7 +1,5 @@ package glog -import () - /* Copying the original glog because it is missing several convenient methods. 1. remove nano time in log format diff --git a/go/images/orientation.go b/go/images/orientation.go index 41ed3f0af..4bff89311 100644 --- a/go/images/orientation.go +++ b/go/images/orientation.go @@ -2,11 +2,12 @@ package images import ( "bytes" - "github.com/rwcarlsen/goexif/exif" "image" "image/draw" "image/jpeg" "log" + + "github.com/rwcarlsen/goexif/exif" ) //many code is copied from http://camlistore.org/pkg/images/images.go diff --git a/go/images/resizing.go b/go/images/resizing.go index 08a1e15d2..e9de5f7d7 100644 --- a/go/images/resizing.go +++ b/go/images/resizing.go @@ -2,11 +2,12 @@ package images import ( "bytes" - "github.com/disintegration/imaging" "image" "image/gif" "image/jpeg" "image/png" + + "github.com/disintegration/imaging" ) func Resized(ext string, data []byte, width, height int) (resized []byte, w int, h int) { diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go index 4e72ad939..672bfa99c 100644 --- a/go/operation/assign_file_id.go +++ b/go/operation/assign_file_id.go @@ -1,12 +1,13 @@ package operation import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" "strconv" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" ) type AssignResult struct { diff --git a/go/operation/data_struts.go b/go/operation/data_struts.go index 09fab05d1..4980f9913 100644 --- a/go/operation/data_struts.go +++ b/go/operation/data_struts.go @@ -1,7 +1,5 @@ package operation -import () - type JoinResult struct { VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` Error string `json:"error,omitempty"` diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go index 84391b634..416a852b3 100644 --- a/go/operation/delete_content.go +++ b/go/operation/delete_content.go @@ -1,12 +1,13 @@ package operation import ( - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" "strings" "sync" + + "github.com/chrislusf/weed-fs/go/util" ) type DeleteResult struct { diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go index 7d46a9ebc..7ada94243 100644 --- a/go/operation/list_masters.go +++ b/go/operation/list_masters.go @@ -1,9 +1,10 @@ package operation import ( + "encoding/json" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/util" - "encoding/json" ) type ClusterStatusResult struct { diff --git a/go/operation/lookup.go b/go/operation/lookup.go index ebf153d27..70bc7146e 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -1,14 +1,15 @@ package operation import ( - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" - _ "fmt" + "fmt" "math/rand" "net/url" "strings" "time" + + "github.com/chrislusf/weed-fs/go/util" ) type Location struct { @@ -21,6 +22,10 @@ type LookupResult struct { Error string `json:"error,omitempty"` } +func (lr *LookupResult) String() string { + return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error) +} + var ( vc VidCache ) diff --git a/go/operation/submit.go b/go/operation/submit.go index 3e09c2edf..62db46617 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -2,13 +2,14 @@ package operation import ( "bytes" - "github.com/chrislusf/weed-fs/go/glog" "io" "mime" "os" "path" "strconv" "strings" + + "github.com/chrislusf/weed-fs/go/glog" ) type FilePart struct { diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go index 9f00dd74d..6f0f974c5 100644 --- a/go/operation/system_message.pb.go +++ b/go/operation/system_message.pb.go @@ -14,7 +14,7 @@ It has these top-level messages: */ package operation -import proto "code.google.com/p/goprotobuf/proto" +import "github.com/golang/protobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. diff --git a/go/operation/system_message_test.go b/go/operation/system_message_test.go index 2731d0b2f..d18ca49a4 100644 --- a/go/operation/system_message_test.go +++ b/go/operation/system_message_test.go @@ -1,10 +1,11 @@ package operation import ( - proto "code.google.com/p/goprotobuf/proto" "encoding/json" "log" "testing" + + "github.com/golang/protobuf/proto" ) func TestSerialDeserial(t *testing.T) { diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go index 38737702d..480d76dca 100644 --- a/go/operation/upload_content.go +++ b/go/operation/upload_content.go @@ -2,7 +2,6 @@ package operation import ( "bytes" - "github.com/chrislusf/weed-fs/go/glog" "encoding/json" "errors" "fmt" @@ -14,6 +13,8 @@ import ( "net/textproto" "path/filepath" "strings" + + "github.com/chrislusf/weed-fs/go/glog" ) type UploadResult struct { diff --git a/go/security/guard.go b/go/security/guard.go new file mode 100644 index 000000000..a2beb48f4 --- /dev/null +++ b/go/security/guard.go @@ -0,0 +1,146 @@ +package security + +import ( + "errors" + "fmt" + "net" + "net/http" + "strings" + "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/dgrijalva/jwt-go" +) + +var ( + ErrUnauthorized = errors.New("unauthorized token") +) + +/* +Guard is to ensure data access security. +There are 2 ways to check access: +1. white list. It's checking request ip address. +2. JSON Web Token(JWT) generated from secretKey. + The jwt can come from: + 1. url parameter jwt=... + 2. request header "Authorization" + 3. cookie with the name "jwt" + +The white list is checked first because it is easy. +Then the JWT is checked. + +The Guard will also check these claims if provided: +1. "exp" Expiration Time +2. "nbf" Not Before + +Generating JWT: +1. use HS256 to sign +2. optionally set "exp", "nbf" fields, in Unix time, + the number of seconds elapsed since January 1, 1970 UTC. + +Referenced: +https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go + +*/ +type Guard struct { + whiteList []string + secretKey string + + isActive bool +} + +func NewGuard(whiteList []string, secretKey string) *Guard { + g := &Guard{whiteList: whiteList, secretKey: secretKey} + g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0 + return g +} + +func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { + if !g.isActive { + //if no security needed, just skip all checkings + return f + } + return func(w http.ResponseWriter, r *http.Request) { + if err := g.doCheck(w, r); err != nil { + w.WriteHeader(http.StatusUnauthorized) + return + } + f(w, r) + } +} + +func (g *Guard) NewToken() (tokenString string, err error) { + m := make(map[string]interface{}) + m["exp"] = time.Now().Unix() + 10 + return g.Encode(m) +} + +func (g *Guard) Encode(claims map[string]interface{}) (tokenString string, err error) { + if !g.isActive { + return "", nil + } + + t := jwt.New(jwt.GetSigningMethod("HS256")) + t.Claims = claims + return t.SignedString(g.secretKey) +} + +func (g *Guard) Decode(tokenString string) (token *jwt.Token, err error) { + return jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) { + return g.secretKey, nil + }) +} + +func (g *Guard) doCheck(w http.ResponseWriter, r *http.Request) error { + if len(g.whiteList) != 0 { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil { + for _, ip := range g.whiteList { + if ip == host { + return nil + } + } + } + } + + if len(g.secretKey) != 0 { + + // Get token from query params + tokenStr := r.URL.Query().Get("jwt") + + // Get token from authorization header + if tokenStr == "" { + bearer := r.Header.Get("Authorization") + if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" { + tokenStr = bearer[7:] + } + } + + // Get token from cookie + if tokenStr == "" { + cookie, err := r.Cookie("jwt") + if err == nil { + tokenStr = cookie.Value + } + } + + if tokenStr == "" { + return ErrUnauthorized + } + + // Verify the token + token, err := g.Decode(tokenStr) + if err != nil { + glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err) + return ErrUnauthorized + } + if !token.Valid { + glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr) + return ErrUnauthorized + } + + } + + glog.V(1).Infof("No permission from %s", r.RemoteAddr) + return fmt.Errorf("No write permisson from %s", r.RemoteAddr) +} diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go index 5a1bceaaf..1aa167b6b 100644 --- a/go/sequence/sequence.go +++ b/go/sequence/sequence.go @@ -1,7 +1,5 @@ package sequence -import () - type Sequencer interface { NextFileId(count int) (uint64, int) SetMax(uint64) diff --git a/go/stats/disk.go b/go/stats/disk.go index d5275e571..46d8c465e 100644 --- a/go/stats/disk.go +++ b/go/stats/disk.go @@ -1,7 +1,5 @@ package stats -import () - type DiskStatus struct { Dir string All uint64 diff --git a/go/stats/disk_notsupported.go b/go/stats/disk_notsupported.go index 37f9bc47d..e380d27ea 100644 --- a/go/stats/disk_notsupported.go +++ b/go/stats/disk_notsupported.go @@ -2,8 +2,6 @@ package stats -import () - func (disk *DiskStatus) fillInStatus() { return } diff --git a/go/stats/memory_notsupported.go b/go/stats/memory_notsupported.go index 64c3d7c2f..ba8229364 100644 --- a/go/stats/memory_notsupported.go +++ b/go/stats/memory_notsupported.go @@ -2,8 +2,6 @@ package stats -import () - func (mem *MemStatus) fillInStatus() { return } diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index 1869a563e..fbb59e9c0 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -1,13 +1,14 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "fmt" - "github.com/tgulacsi/go-cdb" "os" "path/filepath" + + "github.com/chrislusf/weed-fs/go/util" + "github.com/tgulacsi/go-cdb" ) // CDB-backed read-only needle map diff --git a/go/storage/cdb_map_test.go b/go/storage/cdb_map_test.go index cff7dfa61..ed690f44f 100644 --- a/go/storage/cdb_map_test.go +++ b/go/storage/cdb_map_test.go @@ -1,11 +1,12 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" "math/rand" "os" "runtime" "testing" + + "github.com/chrislusf/weed-fs/go/glog" ) var testIndexFilename string = "../../test/sample.idx" diff --git a/go/storage/compact_map.go b/go/storage/compact_map.go index 9cfc3e8f7..6ac18b012 100644 --- a/go/storage/compact_map.go +++ b/go/storage/compact_map.go @@ -1,7 +1,5 @@ package storage -import () - type NeedleValue struct { Key Key Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go index ef43de25b..f74684225 100644 --- a/go/storage/compact_map_perf_test.go +++ b/go/storage/compact_map_perf_test.go @@ -1,11 +1,12 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" "log" "os" "testing" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" ) func TestMemoryUsage(t *testing.T) { diff --git a/go/storage/compress.go b/go/storage/compress.go index a353c9d3a..5efc9e1f1 100644 --- a/go/storage/compress.go +++ b/go/storage/compress.go @@ -2,11 +2,12 @@ package storage import ( "bytes" - "github.com/chrislusf/weed-fs/go/glog" "compress/flate" "compress/gzip" "io/ioutil" "strings" + + "github.com/chrislusf/weed-fs/go/glog" ) /* diff --git a/go/storage/crc.go b/go/storage/crc.go index 7aa400959..af25b9e53 100644 --- a/go/storage/crc.go +++ b/go/storage/crc.go @@ -1,9 +1,10 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/util" "fmt" "hash/crc32" + + "github.com/chrislusf/weed-fs/go/util" ) var table = crc32.MakeTable(crc32.Castagnoli) diff --git a/go/storage/file_id.go b/go/storage/file_id.go index ec566826c..f6e36a98c 100644 --- a/go/storage/file_id.go +++ b/go/storage/file_id.go @@ -1,11 +1,12 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" "encoding/hex" "errors" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" ) type FileId struct { diff --git a/go/storage/needle.go b/go/storage/needle.go index daede321b..11610dd80 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -1,11 +1,9 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/images" - "github.com/chrislusf/weed-fs/go/util" "encoding/hex" "errors" + "fmt" "io/ioutil" "mime" "net/http" @@ -13,6 +11,10 @@ import ( "strconv" "strings" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/images" + "github.com/chrislusf/weed-fs/go/util" ) const ( @@ -23,6 +25,7 @@ const ( ) /* +* A Needle means a uploaded and stored file. * Needle file size is limited to 4GB for now. */ type Needle struct { @@ -44,6 +47,11 @@ type Needle struct { Padding []byte `comment:"Aligned to 8 bytes"` } +func (n *Needle) String() (str string) { + str = fmt.Sprintf("Cookie:%d, Id:%d, Size:%d, DataSize:%d, Name: %s, Mime: %s", n.Cookie, n.Id, n.Size, n.DataSize, n.Name, n.Mime) + return +} + func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) { form, fe := r.MultipartReader() if fe != nil { diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index dca2e6c5d..504ca1552 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -1,11 +1,12 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" "fmt" "io" "os" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" ) type NeedleMapper interface { diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index bf452ba37..663b5abbd 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -1,12 +1,13 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" "errors" "fmt" "io" "os" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" ) const ( diff --git a/go/storage/store.go b/go/storage/store.go index e7a9dac94..65eed1d0e 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -1,10 +1,6 @@ package storage import ( - proto "code.google.com/p/goprotobuf/proto" - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "fmt" @@ -12,6 +8,11 @@ import ( "math/rand" "strconv" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/util" + "github.com/golang/protobuf/proto" ) const ( @@ -32,6 +33,10 @@ type MasterNodes struct { lastNode int } +func (mn *MasterNodes) String() string { + return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode) +} + func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} return @@ -64,6 +69,9 @@ func (mn *MasterNodes) findMaster() (string, error) { return mn.nodes[mn.lastNode], nil } +/* + * A VolumeServer contains one Store + */ type Store struct { Port int Ip string @@ -76,6 +84,11 @@ type Store struct { masterNodes *MasterNodes } +func (s *Store) String() (str string) { + str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes) + return +} + func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s.Locations = make([]*DiskLocation, 0) diff --git a/go/storage/store_vacuum.go b/go/storage/store_vacuum.go index 3527e4f59..209e3b4b3 100644 --- a/go/storage/store_vacuum.go +++ b/go/storage/store_vacuum.go @@ -1,9 +1,10 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" "fmt" "strconv" + + "github.com/chrislusf/weed-fs/go/glog" ) func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { diff --git a/go/storage/volume.go b/go/storage/volume.go index de79e9107..a1eccd62c 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -2,7 +2,6 @@ package storage import ( "bytes" - "github.com/chrislusf/weed-fs/go/glog" "errors" "fmt" "io" @@ -10,6 +9,8 @@ import ( "path" "sync" "time" + + "github.com/chrislusf/weed-fs/go/glog" ) type Volume struct { @@ -32,6 +33,10 @@ func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement e = v.load(true, true) return } +func (v *Volume) String() string { + return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) +} + func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} @@ -72,7 +77,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { if e != nil { if !os.IsPermission(e) { - return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error()) + return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e) } } @@ -92,12 +97,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil { - return fmt.Errorf("cannot read Volume Index %s.idx: %s", fileName, e.Error()) + return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e) } } else { glog.V(1).Infoln("open to write file", fileName+".idx") if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil { - return fmt.Errorf("cannot write Volume Index %s.idx: %s", fileName, e.Error()) + return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly) @@ -115,7 +120,7 @@ func (v *Volume) Size() int64 { if e == nil { return stat.Size() } - glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error()) + glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) return -1 } func (v *Volume) Close() { @@ -134,7 +139,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { oldNeedle := new(Needle) oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { - n.Size = oldNeedle.Size + n.DataSize = oldNeedle.DataSize return true } } @@ -164,12 +169,13 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { v.accessLock.Lock() defer v.accessLock.Unlock() if v.isFileUnchanged(n) { - size = n.Size + size = n.DataSize glog.V(4).Infof("needle is unchanged!") return } var offset int64 if offset, err = v.dataFile.Seek(0, 2); err != nil { + glog.V(0).Infof("faile to seek the end of file: %v", err) return } @@ -177,21 +183,21 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { if offset%NeedlePaddingSize != 0 { offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) if offset, err = v.dataFile.Seek(offset, 0); err != nil { - glog.V(4).Infof("failed to align in datafile %s: %s", v.dataFile.Name(), err.Error()) + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) return } } if size, err = n.Append(v.dataFile, v.Version()); err != nil { if e := v.dataFile.Truncate(offset); e != nil { - err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile.Name(), e.Error()) + err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e) } return } nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error()) + glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } if v.lastModifiedTime < n.LastModified { @@ -292,13 +298,13 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, offset := int64(SuperBlockSize) n, rest, e := ReadNeedleHeader(v.dataFile, version, offset) if e != nil { - err = fmt.Errorf("cannot read needle header: %s", e) + err = fmt.Errorf("cannot read needle header: %v", e) return } for n != nil { if readNeedleBody { if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil { - err = fmt.Errorf("cannot read needle body: %s", err) + err = fmt.Errorf("cannot read needle body: %v", err) return } } @@ -310,7 +316,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, if err == io.EOF { return nil } - return fmt.Errorf("cannot read needle header: %s", err) + return fmt.Errorf("cannot read needle header: %v", err) } } @@ -360,7 +366,7 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { defer indexFile.Close() glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error()) + glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e) return false } return true diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 6410c1784..bc8049ea4 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "github.com/chrislusf/weed-fs/go/operation" ) @@ -36,3 +37,7 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er vi.Ttl = LoadTTLFromUint32(*m.Ttl) return vi, nil } + +func (vi VolumeInfo) String() string { + return fmt.Sprintf("Id:%s, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) +} diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index a7e86b1c3..57e0deea9 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -1,9 +1,10 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" "fmt" "os" + + "github.com/chrislusf/weed-fs/go/glog" ) const ( diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index b348434d2..7e026a61d 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -1,10 +1,11 @@ package storage import ( - "github.com/chrislusf/weed-fs/go/glog" "fmt" "os" "time" + + "github.com/chrislusf/weed-fs/go/glog" ) func (v *Volume) garbageLevel() float64 { diff --git a/go/storage/volume_version.go b/go/storage/volume_version.go index 9702ae904..2e9f58aa2 100644 --- a/go/storage/volume_version.go +++ b/go/storage/volume_version.go @@ -1,7 +1,5 @@ package storage -import () - type Version uint8 const ( diff --git a/go/tools/read_index.go b/go/tools/read_index.go index b99c5b6b8..1104dc348 100644 --- a/go/tools/read_index.go +++ b/go/tools/read_index.go @@ -1,11 +1,12 @@ package main import ( - "github.com/chrislusf/weed-fs/go/storage" "flag" "fmt" "log" "os" + + "github.com/chrislusf/weed-fs/go/storage" ) var ( diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index 6562e9ac5..a791b4c1c 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/storage" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" + + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) type AllocateVolumeResult struct { diff --git a/go/topology/collection.go b/go/topology/collection.go index 506f43fbf..5437ffd79 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -1,36 +1,43 @@ package topology import ( + "fmt" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) type Collection struct { Name string volumeSizeLimit uint64 - storageType2VolumeLayout map[string]*VolumeLayout + storageType2VolumeLayout *util.ConcurrentReadMap } func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.storageType2VolumeLayout = make(map[string]*VolumeLayout) + c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } +func (c *Collection) String() string { + return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) +} + func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() } - if c.storageType2VolumeLayout[keyString] == nil { - c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit) - } - return c.storageType2VolumeLayout[keyString] + vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { + return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + }) + return vl.(*VolumeLayout) } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.storageType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - if list := vl.Lookup(vid); list != nil { + if list := vl.(*VolumeLayout).Lookup(vid); list != nil { return list } } @@ -39,9 +46,9 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.storageType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - if list := vl.ListVolumeServers(); list != nil { + if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { nodes = append(nodes, list...) } } diff --git a/go/topology/data_center.go b/go/topology/data_center.go index ebd07803b..bcf2dfd31 100644 --- a/go/topology/data_center.go +++ b/go/topology/data_center.go @@ -1,7 +1,5 @@ package topology -import () - type DataCenter struct { NodeImpl } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index c3b90470f..09b9fac6c 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -1,9 +1,11 @@ package topology import ( + "fmt" + "strconv" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/storage" - "strconv" ) type DataNode struct { @@ -25,6 +27,10 @@ func NewDataNode(id string) *DataNode { return s } +func (dn *DataNode) String() string { + return fmt.Sprintf("NodeImpl:%s ,volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) +} + func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v diff --git a/go/topology/node.go b/go/topology/node.go index 54118802e..10955fa72 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "errors" "math/rand" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) type NodeId string diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index 6ea019bd8..0c52f9d30 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -2,12 +2,13 @@ package topology import ( "bytes" + "net/http" + "strconv" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" - "net/http" - "strconv" ) func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { diff --git a/go/topology/topology.go b/go/topology/topology.go index c90e8de0b..c2073ed2f 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -1,20 +1,22 @@ package topology import ( + "errors" + "io/ioutil" + "math/rand" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/storage" - "errors" + "github.com/chrislusf/weed-fs/go/util" "github.com/goraft/raft" - "io/ioutil" - "math/rand" ) type Topology struct { NodeImpl - collectionMap map[string]*Collection + collectionMap *util.ConcurrentReadMap pulse int64 @@ -37,7 +39,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.nodeType = "Topology" t.NodeImpl.value = t t.children = make(map[NodeId]Node) - t.collectionMap = make(map[string]*Collection) + t.collectionMap = util.NewConcurrentReadMap() t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit @@ -89,14 +91,14 @@ func (t *Topology) loadConfiguration(configurationFile string) error { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { //maybe an issue if lots of collections? if collection == "" { - for _, c := range t.collectionMap { - if list := c.Lookup(vid); list != nil { + for _, c := range t.collectionMap.Items { + if list := c.(*Collection).Lookup(vid); list != nil { return list } } } else { - if c, ok := t.collectionMap[collection]; ok { - return c.Lookup(vid) + if c, ok := t.collectionMap.Items[collection]; ok { + return c.(*Collection).Lookup(vid) } } return nil @@ -109,7 +111,7 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return next } -func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { +func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } @@ -124,20 +126,18 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in } func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { - _, ok := t.collectionMap[collectionName] - if !ok { - t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) - } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) + return t.collectionMap.Get(collectionName, func() interface{} { + return NewCollection(collectionName, t.volumeSizeLimit) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } -func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { - collection, ok = t.collectionMap[collectionName] - return +func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { + c, hasCollection := t.collectionMap.Items[collectionName] + return c.(*Collection), hasCollection } func (t *Topology) DeleteCollection(collectionName string) { - delete(t.collectionMap, collectionName) + delete(t.collectionMap.Items, collectionName) } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index eb4491484..7e36568b6 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -1,10 +1,11 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "math/rand" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index d6400c988..6a1423ca8 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -1,7 +1,5 @@ package topology -import () - func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) m["Max"] = t.GetMaxVolumeCount() @@ -13,10 +11,11 @@ func (t *Topology) ToMap() interface{} { } m["DataCenters"] = dcs var layouts []interface{} - for _, c := range t.collectionMap { - for _, layout := range c.storageType2VolumeLayout { + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + for _, layout := range c.storageType2VolumeLayout.Items { if layout != nil { - tmp := layout.ToMap() + tmp := layout.(*VolumeLayout).ToMap() tmp["collection"] = c.Name layouts = append(layouts, tmp) } diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 72846f20b..d6fa2213e 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -1,13 +1,14 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { @@ -79,13 +80,15 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis return isCommitSuccess } func (t *Topology) Vacuum(garbageThreshold string) int { - for _, c := range t.collectionMap { - for _, vl := range c.storageType2VolumeLayout { + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(vl, vid, locationlist) { - batchVacuumVolumeCommit(vl, vid, locationlist) + volumeLayout := vl.(*VolumeLayout) + for vid, locationlist := range volumeLayout.vid2location { + if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { + batchVacuumVolumeCommit(volumeLayout, vid, locationlist) } } } diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 2859d3992..6124c0da2 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "fmt" "math/rand" "sync" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) /* @@ -29,6 +30,10 @@ type VolumeGrowth struct { accessLock sync.Mutex } +func (o *VolumeGrowOption) String() string { + return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode) +} + func NewDefaultVolumeGrowth() *VolumeGrowth { return &VolumeGrowth{} } diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index 5581c87ce..267b36042 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/sequence" - "github.com/chrislusf/weed-fs/go/storage" "encoding/json" "fmt" "testing" + + "github.com/chrislusf/weed-fs/go/sequence" + "github.com/chrislusf/weed-fs/go/storage" ) var topologyLayout = ` diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 7bb0cf7e3..4b1d3dad9 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -1,11 +1,13 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "errors" + "fmt" "math/rand" "sync" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) // mapping from volume to its locations, inverted from server to volume @@ -28,6 +30,10 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL } } +func (vl *VolumeLayout) String() string { + return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) +} + func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index 176f469b9..0f892c010 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -1,6 +1,8 @@ package topology -import () +import ( + "fmt" +) type VolumeLocationList struct { list []*DataNode @@ -10,6 +12,10 @@ func NewVolumeLocationList() *VolumeLocationList { return &VolumeLocationList{} } +func (dnll *VolumeLocationList) String() string { + return fmt.Sprintf("%v", dnll.list) +} + func (dnll *VolumeLocationList) Head() *DataNode { return dnll.list[0] } diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go new file mode 100644 index 000000000..880fe54e3 --- /dev/null +++ b/go/util/concurrent_read_map.go @@ -0,0 +1,39 @@ +package util + +import ( + "sync" +) + +// A mostly for read map, which can thread-safely +// initialize the map entries. +type ConcurrentReadMap struct { + rmutex sync.RWMutex + mutex sync.Mutex + Items map[string]interface{} +} + +func NewConcurrentReadMap() *ConcurrentReadMap { + return &ConcurrentReadMap{Items: make(map[string]interface{})} +} + +func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + if value, ok := m.Items[key]; ok { + return value + } + value = newEntry() + m.Items[key] = value + return value +} + +func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { + m.rmutex.RLock() + if value, ok := m.Items[key]; ok { + m.rmutex.RUnlock() + return value + } else { + m.rmutex.RUnlock() + return m.initMapEntry(key, newEntry) + } +} diff --git a/go/util/config.go b/go/util/config.go index 050fd0e64..4cf1d7c64 100644 --- a/go/util/config.go +++ b/go/util/config.go @@ -10,9 +10,10 @@ package util import ( "bytes" - "github.com/chrislusf/weed-fs/go/glog" "encoding/json" "os" + + "github.com/chrislusf/weed-fs/go/glog" ) type Config struct { diff --git a/go/util/constants.go b/go/util/constants.go index db1ca38e5..d677ba44f 100644 --- a/go/util/constants.go +++ b/go/util/constants.go @@ -1,7 +1,5 @@ package util -import () - const ( - VERSION = "0.64" + VERSION = "0.67" ) diff --git a/go/util/file_util.go b/go/util/file_util.go index 412d98458..30e24f001 100644 --- a/go/util/file_util.go +++ b/go/util/file_util.go @@ -2,9 +2,10 @@ package util import ( "bufio" - "github.com/chrislusf/weed-fs/go/glog" "errors" "os" + + "github.com/chrislusf/weed-fs/go/glog" ) func TestFolderWritable(folder string) (err error) { diff --git a/go/util/net_timeout.go b/go/util/net_timeout.go index eb80822b5..f274e4802 100644 --- a/go/util/net_timeout.go +++ b/go/util/net_timeout.go @@ -1,9 +1,10 @@ package util import ( - "github.com/chrislusf/weed-fs/go/stats" "net" "time" + + "github.com/chrislusf/weed-fs/go/stats" ) // Listener wraps a net.Listener, and gives a place to store the timeout diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index fec8472e5..f4f0b1874 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -2,9 +2,6 @@ package main import ( "bufio" - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/util" "fmt" "io" "math" @@ -16,6 +13,10 @@ import ( "strings" "sync" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/util" ) type BenchmarkOptions struct { @@ -30,11 +31,14 @@ type BenchmarkOptions struct { sequentialRead *bool collection *string cpuprofile *string + maxCpu *int vid2server map[string]string //cache for vid locations + } var ( - b BenchmarkOptions + b BenchmarkOptions + sharedBytes []byte ) func init() { @@ -50,33 +54,35 @@ func init() { b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") - b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file") + b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") + b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.vid2server = make(map[string]string) + sharedBytes = make([]byte, 1024) } var cmdBenchmark = &Command{ UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000", Short: "benchmark on writing millions of files and read out", Long: `benchmark on an empty weed file system. - + Two tests during benchmark: 1) write lots of small files to the system 2) read the files out - + The file content is mostly zero, but no compression is done. - + You can choose to only benchmark read or write. During write, the list of uploaded file ids is stored in "-list" specified file. You can also use your own list of file ids to run read test. - + Write speed and read speed will be collected. The numbers are used to get a sense of the system. Usually your network or the hard drive is the real bottleneck. - + Another thing to watch is whether the volumes are evenly distributed to each volume server. Because the 7 more benchmark volumes are randomly distributed to servers with free slots, it's highly possible some servers have uneven amount of - benchmark volumes. To remedy this, you can use this to grow the benchmark volumes + benchmark volumes. To remedy this, you can use this to grow the benchmark volumes before starting the benchmark command: http://localhost:9333/vol/grow?collection=benchmark&count=5 @@ -87,18 +93,17 @@ var cmdBenchmark = &Command{ } var ( - wait sync.WaitGroup - writeStats *stats - readStats *stats - serverLimitChan map[string]chan bool + wait sync.WaitGroup + writeStats *stats + readStats *stats ) -func init() { - serverLimitChan = make(map[string]chan bool) -} - func runbenchmark(cmd *Command, args []string) bool { fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + if *b.maxCpu < 1 { + *b.maxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*b.maxCpu) if *b.cpuprofile != "" { f, err := os.Create(*b.cpuprofile) if err != nil { @@ -122,12 +127,12 @@ func runbenchmark(cmd *Command, args []string) bool { func bench_write() { fileIdLineChan := make(chan string) finishChan := make(chan bool) - writeStats = newStats() + writeStats = newStats(*b.concurrency) idChan := make(chan int) - wait.Add(*b.concurrency) go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) for i := 0; i < *b.concurrency; i++ { - go writeFiles(idChan, fileIdLineChan, writeStats) + wait.Add(1) + go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i]) } writeStats.start = time.Now() writeStats.total = *b.numberOfFiles @@ -138,28 +143,30 @@ func bench_write() { close(idChan) wait.Wait() writeStats.end = time.Now() - wait.Add(1) + wait.Add(2) finishChan <- true finishChan <- true - close(finishChan) wait.Wait() + close(finishChan) writeStats.printStats() } func bench_read() { fileIdLineChan := make(chan string) finishChan := make(chan bool) - readStats = newStats() - wait.Add(*b.concurrency) + readStats = newStats(*b.concurrency) go readFileIds(*b.idListFile, fileIdLineChan) readStats.start = time.Now() readStats.total = *b.numberOfFiles go readStats.checkProgress("Randomly Reading Benchmark", finishChan) for i := 0; i < *b.concurrency; i++ { - go readFiles(fileIdLineChan, readStats) + wait.Add(1) + go readFiles(fileIdLineChan, &readStats.localStats[i]) } wait.Wait() + wait.Add(1) finishChan <- true + wait.Wait() close(finishChan) readStats.end = time.Now() readStats.printStats() @@ -170,126 +177,102 @@ type delayedFile struct { fp *operation.FilePart } -func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { +func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { + defer wait.Done() delayedDeleteChan := make(chan *delayedFile, 100) var waitForDeletions sync.WaitGroup for i := 0; i < 7; i++ { + waitForDeletions.Add(1) go func() { - waitForDeletions.Add(1) + defer waitForDeletions.Done() for df := range delayedDeleteChan { - if df == nil { - break - } if df.enterTime.After(time.Now()) { time.Sleep(df.enterTime.Sub(time.Now())) } - fp := df.fp - serverLimitChan[fp.Server] <- true - if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil { + if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil { s.completed++ } else { s.failed++ } - <-serverLimitChan[fp.Server] } - waitForDeletions.Done() }() } - for { - if id, ok := <-idChan; ok { - start := time.Now() - fileSize := int64(*b.fileSize + rand.Intn(64)) - fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} - if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { - fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection - if _, ok := serverLimitChan[fp.Server]; !ok { - serverLimitChan[fp.Server] = make(chan bool, 7) - } - serverLimitChan[fp.Server] <- true - if _, err := fp.Upload(0, *b.server); err == nil { - if rand.Intn(100) < *b.deletePercentage { - s.total++ - delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} - } else { - fileIdLineChan <- fp.Fid - } - s.completed++ - s.transferred += fileSize + for id := range idChan { + start := time.Now() + fileSize := int64(*b.fileSize + rand.Intn(64)) + fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} + if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { + fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection + if _, err := fp.Upload(0, *b.server); err == nil { + if rand.Intn(100) < *b.deletePercentage { + s.total++ + delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} } else { - s.failed++ - } - writeStats.addSample(time.Now().Sub(start)) - <-serverLimitChan[fp.Server] - if *cmdBenchmark.IsDebug { - fmt.Printf("writing %d file %s\n", id, fp.Fid) + fileIdLineChan <- fp.Fid } + s.completed++ + s.transferred += fileSize } else { s.failed++ - println("writing file error:", err.Error()) + fmt.Printf("Failed to write with error:%v\n", err) + } + writeStats.addSample(time.Now().Sub(start)) + if *cmdBenchmark.IsDebug { + fmt.Printf("writing %d file %s\n", id, fp.Fid) } } else { - break + s.failed++ + println("writing file error:", err.Error()) } } close(delayedDeleteChan) waitForDeletions.Wait() - wait.Done() } -func readFiles(fileIdLineChan chan string, s *stats) { - serverLimitChan := make(map[string]chan bool) +func readFiles(fileIdLineChan chan string, s *stat) { + defer wait.Done() masterLimitChan := make(chan bool, 1) - for { - if fid, ok := <-fileIdLineChan; ok { - if len(fid) == 0 { - continue - } - if fid[0] == '#' { - continue - } - if *cmdBenchmark.IsDebug { - fmt.Printf("reading file %s\n", fid) - } - parts := strings.SplitN(fid, ",", 2) - vid := parts[0] - start := time.Now() - if server, ok := b.vid2server[vid]; !ok { - masterLimitChan <- true - if _, now_ok := b.vid2server[vid]; !now_ok { - if ret, err := operation.Lookup(*b.server, vid); err == nil { - if len(ret.Locations) > 0 { - server = ret.Locations[0].PublicUrl - b.vid2server[vid] = server - } + for fid := range fileIdLineChan { + if len(fid) == 0 { + continue + } + if fid[0] == '#' { + continue + } + if *cmdBenchmark.IsDebug { + fmt.Printf("reading file %s\n", fid) + } + parts := strings.SplitN(fid, ",", 2) + vid := parts[0] + start := time.Now() + if server, ok := b.vid2server[vid]; !ok { + masterLimitChan <- true + if _, now_ok := b.vid2server[vid]; !now_ok { + if ret, err := operation.Lookup(*b.server, vid); err == nil { + if len(ret.Locations) > 0 { + server = ret.Locations[0].PublicUrl + b.vid2server[vid] = server } } - <-masterLimitChan } - if server, ok := b.vid2server[vid]; ok { - if _, ok := serverLimitChan[server]; !ok { - serverLimitChan[server] = make(chan bool, 7) - } - serverLimitChan[server] <- true - url := "http://" + server + "/" + fid - if bytesRead, err := util.Get(url); err == nil { - s.completed++ - s.transferred += int64(len(bytesRead)) - readStats.addSample(time.Now().Sub(start)) - } else { - s.failed++ - println("!!!! Failed to read from ", url, " !!!!!") - } - <-serverLimitChan[server] + <-masterLimitChan + } + if server, ok := b.vid2server[vid]; ok { + url := "http://" + server + "/" + fid + if bytesRead, err := util.Get(url); err == nil { + s.completed++ + s.transferred += int64(len(bytesRead)) + readStats.addSample(time.Now().Sub(start)) } else { s.failed++ - println("!!!! volume id ", vid, " location not found!!!!!") + fmt.Printf("Failed to read %s error:%v\n", url, err) } } else { - break + s.failed++ + println("!!!! volume id ", vid, " location not found!!!!!") } } - wait.Done() } func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { @@ -353,20 +336,28 @@ const ( // An efficient statics collecting and rendering type stats struct { - data []int - overflow []int + data []int + overflow []int + localStats []stat + start time.Time + end time.Time + total int +} +type stat struct { completed int failed int total int transferred int64 - start time.Time - end time.Time } var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100} -func newStats() *stats { - return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)} +func newStats(n int) *stats { + return &stats{ + data: make([]int, benchResolution), + overflow: make([]int, 0), + localStats: make([]stat, n), + } } func (s *stats) addSample(d time.Duration) { @@ -387,28 +378,41 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) { for { select { case <-finishChan: + wait.Done() return case t := <-ticker: - completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime) + completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total + for _, localStat := range s.localStats { + completed += localStat.completed + transferred += localStat.transferred + total += localStat.total + } fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n", - s.completed, s.total, float64(s.completed)*100/float64(s.total), - float64(completed)*float64(int64(time.Second))/float64(int64(taken)), - float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024), + completed, total, float64(completed)*100/float64(total), + float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)), + float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024), ) - lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t + lastCompleted, lastTransferred, lastTime = completed, transferred, t } } } func (s *stats) printStats() { + completed, failed, transferred, total := 0, 0, int64(0), s.total + for _, localStat := range s.localStats { + completed += localStat.completed + failed += localStat.failed + transferred += localStat.transferred + total += localStat.total + } timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000 fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency) fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken) - fmt.Printf("Complete requests: %d\n", s.completed) - fmt.Printf("Failed requests: %d\n", s.failed) - fmt.Printf("Total transferred: %d bytes\n", s.transferred) - fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken) - fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken) + fmt.Printf("Complete requests: %d\n", completed) + fmt.Printf("Failed requests: %d\n", failed) + fmt.Printf("Total transferred: %d bytes\n", transferred) + fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken) + fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken) n, sum := 0, 0 min, max := 10000000, 0 for i := 0; i < len(s.data); i++ { @@ -496,15 +500,32 @@ func (l *FakeReader) Read(p []byte) (n int, err error) { } else { n = len(p) } - for i := 0; i < n-8; i += 8 { - for s := uint(0); s < 8; s++ { - p[i] = byte(l.id >> (s * 8)) + if n >= 8 { + for i := 0; i < 8; i++ { + p[i] = byte(l.id >> uint(i*8)) } } l.size -= int64(n) return } +func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) { + size := int(l.size) + bufferSize := len(sharedBytes) + for size > 0 { + tempBuffer := sharedBytes + if size < bufferSize { + tempBuffer = sharedBytes[0:size] + } + count, e := w.Write(tempBuffer) + if e != nil { + return int64(size), e + } + size -= count + } + return l.size, nil +} + func Readln(r *bufio.Reader) ([]byte, error) { var ( isPrefix bool = true diff --git a/go/weed/compact.go b/go/weed/compact.go index a99e6c93e..71c4ea90f 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -12,7 +12,7 @@ func init() { var cmdCompact = &Command{ UsageLine: "compact -dir=/tmp -volumeId=234", - Short: "run weed tool compact on volume file if corrupted", + Short: "run weed tool compact on volume file", Long: `Force an compaction to remove deleted files from volume files. The compacted .dat file is stored as .cpd file. The compacted .idx file is stored as .cpx file. diff --git a/go/weed/download.go b/go/weed/download.go index c30d17915..c782654f5 100644 --- a/go/weed/download.go +++ b/go/weed/download.go @@ -1,14 +1,15 @@ package main import ( - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/util" "fmt" "io" "io/ioutil" "os" "path" "strings" + + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/util" ) var ( diff --git a/go/weed/export.go b/go/weed/export.go index 81bc21f6e..c9cc0e3fe 100644 --- a/go/weed/export.go +++ b/go/weed/export.go @@ -3,8 +3,6 @@ package main import ( "archive/tar" "bytes" - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "fmt" "os" "path" @@ -12,6 +10,9 @@ import ( "strings" "text/template" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) func init() { @@ -36,7 +37,7 @@ var cmdExport = &Command{ var ( exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") exportCollection = cmdExport.Flag.String("collection", "", "the volume collection name") - exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") + exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.") dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") tarFh *tar.Writer diff --git a/go/weed/filer.go b/go/weed/filer.go index 7dbecb4d0..5b3fd2b67 100644 --- a/go/weed/filer.go +++ b/go/weed/filer.go @@ -1,13 +1,14 @@ package main import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" - "github.com/chrislusf/weed-fs/go/weed/weed_server" "net/http" "os" "strconv" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/weed/weed_server" ) var ( @@ -20,6 +21,11 @@ type FilerOptions struct { collection *string defaultReplicaPlacement *string dir *string + redirectOnRead *bool + cassandra_server *string + cassandra_keyspace *string + redis_server *string + redis_database *int } func init() { @@ -28,14 +34,19 @@ func init() { f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data") - f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "Default replication type if not specified.") + f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") + f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") + f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") + f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") + f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") + f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server") } var cmdFiler = &Command{ UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>", Short: "start a file server that points to a master server", Long: `start a file server which accepts REST operation for any files. - + //create or overwrite the file, the directories /path/to will be automatically created POST /path/to/file //get the file content @@ -44,22 +55,27 @@ var cmdFiler = &Command{ POST /path/to/ //return a json format subdirectory and files listing GET /path/to/ - + Current <fullpath~fileid> mapping metadata store is local embedded leveldb. It should be highly scalable to hundreds of millions of files on a modest machine. - + Future we will ensure it can avoid of being SPOF. `, } func runFiler(cmd *Command, args []string) bool { + if err := util.TestFolderWritable(*f.dir); err != nil { glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err) } r := http.NewServeMux() - _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection) + _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection, + *f.defaultReplicaPlacement, *f.redirectOnRead, + *f.cassandra_server, *f.cassandra_keyspace, + *f.redis_server, *f.redis_database, + ) if nfs_err != nil { glog.Fatalf(nfs_err.Error()) } diff --git a/go/weed/fix.go b/go/weed/fix.go index ad573875a..e66075ed2 100644 --- a/go/weed/fix.go +++ b/go/weed/fix.go @@ -1,11 +1,12 @@ package main import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "os" "path" "strconv" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) func init() { @@ -16,7 +17,7 @@ func init() { var cmdFix = &Command{ UsageLine: "fix -dir=/tmp -volumeId=234", Short: "run weed tool fix on index file if corrupted", - Long: `Fix runs the WeedFS fix command to re-create the index .idx file. + Long: `Fix runs the SeeweedFS fix command to re-create the index .idx file. `, } diff --git a/go/weed/master.go b/go/weed/master.go index 6617c8ca6..de4b5cb4b 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -1,16 +1,17 @@ package main import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" - "github.com/chrislusf/weed-fs/go/weed/weed_server" - "github.com/gorilla/mux" "net/http" "os" "runtime" "strconv" "strings" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/weed/weed_server" + "github.com/gorilla/mux" ) func init() { @@ -29,6 +30,7 @@ var cmdMaster = &Command{ var ( mport = cmdMaster.Flag.Int("port", 9333, "http listen port") masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces") + masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") mPublicIp = cmdMaster.Flag.String("publicIp", "", "peer accessible <ip>|<server_name>") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") @@ -40,6 +42,7 @@ var ( mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + masterSecureKey = cmdMaster.Flag.String("secure.key", "", "secret key to check permission") masterWhiteList []string ) @@ -58,10 +61,11 @@ func runMaster(cmd *Command, args []string) bool { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *mport, *metaFolder, - *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList, + *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, + masterWhiteList, *masterSecureKey, ) - listeningAddress := *masterIp + ":" + strconv.Itoa(*mport) + listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) diff --git a/go/weed/mount.go b/go/weed/mount.go index 0290e0f53..66e645387 100644 --- a/go/weed/mount.go +++ b/go/weed/mount.go @@ -1,7 +1,5 @@ package main -import () - type MountOptions struct { filer *string dir *string diff --git a/go/weed/mount_std.go b/go/weed/mount_std.go index e5fc0986c..808c6c563 100644 --- a/go/weed/mount_std.go +++ b/go/weed/mount_std.go @@ -3,15 +3,16 @@ package main import ( + "fmt" + "os" + "runtime" + "bazil.org/fuse" "bazil.org/fuse/fs" "github.com/chrislusf/weed-fs/go/filer" "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" - "fmt" - "os" - "runtime" ) func runMount(cmd *Command, args []string) bool { diff --git a/go/weed/server.go b/go/weed/server.go index 1d854d641..0b973f7e1 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -1,10 +1,6 @@ package main import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" - "github.com/chrislusf/weed-fs/go/weed/weed_server" - "github.com/gorilla/mux" "net/http" "os" "runtime" @@ -13,6 +9,11 @@ import ( "strings" "sync" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/weed/weed_server" + "github.com/gorilla/mux" ) type ServerOptions struct { @@ -31,17 +32,17 @@ func init() { var cmdServer = &Command{ UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name", Short: "start a server, including volume server, and automatically elect a master server", - Long: `start both a volume server to provide storage spaces + Long: `start both a volume server to provide storage spaces and a master server to provide volume=>location mapping service and sequence number of file ids - + This is provided as a convenient way to start both volume server and master server. The servers are exactly the same as starting them separately. So other volume servers can use this embedded master server also. - + Optionally, one filer server can be started. Logically, filer servers should not be in a cluster. They run with meta data on disk, not shared. So each filer server is different. - + `, } @@ -55,6 +56,7 @@ var ( serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list") + serverSecureKey = cmdServer.Flag.String("secure.key", "", "secret key to ensure authenticated access") serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") @@ -72,12 +74,18 @@ var ( ) func init() { - serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "write cpu profile to file") + serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") + filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") + filerOptions.cassandra_server = cmdFiler.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") + filerOptions.cassandra_keyspace = cmdFiler.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") + filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") + filerOptions.redis_database = cmdFiler.Flag.Int("filer.redis.database", 0, "the database on the redis server") + } func runServer(cmd *Command, args []string) bool { @@ -98,6 +106,10 @@ func runServer(cmd *Command, args []string) bool { } } + if *filerOptions.redirectOnRead { + *isStartingFiler = true + } + *filerOptions.master = *serverPublicIp + ":" + strconv.Itoa(*masterPort) if *filerOptions.defaultReplicaPlacement == "" { @@ -149,7 +161,11 @@ func runServer(cmd *Command, args []string) bool { if *isStartingFiler { go func() { r := http.NewServeMux() - _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection) + _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, + *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, + "", "", + "", 0, + ) if nfs_err != nil { glog.Fatalf(nfs_err.Error()) } @@ -176,7 +192,8 @@ func runServer(cmd *Command, args []string) bool { go func() { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, - *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList, + *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, + serverWhiteList, *serverSecureKey, ) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) @@ -208,8 +225,8 @@ func runServer(cmd *Command, args []string) bool { time.Sleep(100 * time.Millisecond) r := http.NewServeMux() volumeServer := weed_server.NewVolumeServer(r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts, - *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, - *volumeFixJpgOrientation, + *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, + serverWhiteList, *volumeFixJpgOrientation, ) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) diff --git a/go/weed/shell.go b/go/weed/shell.go index c8043e0dd..f2c4990ea 100644 --- a/go/weed/shell.go +++ b/go/weed/shell.go @@ -2,9 +2,10 @@ package main import ( "bufio" - "github.com/chrislusf/weed-fs/go/glog" "fmt" "os" + + "github.com/chrislusf/weed-fs/go/glog" ) func init() { diff --git a/go/weed/signal_handling_notsupported.go b/go/weed/signal_handling_notsupported.go index ad4f37b0c..343cf7de2 100644 --- a/go/weed/signal_handling_notsupported.go +++ b/go/weed/signal_handling_notsupported.go @@ -2,7 +2,5 @@ package main -import () - func OnInterrupt(fn func()) { } diff --git a/go/weed/upload.go b/go/weed/upload.go index 4eae4d274..2d67c0bd9 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -1,11 +1,12 @@ package main import ( - "github.com/chrislusf/weed-fs/go/operation" "encoding/json" "fmt" "os" "path/filepath" + + "github.com/chrislusf/weed-fs/go/operation" ) var ( diff --git a/go/weed/version.go b/go/weed/version.go index 63441509e..8d3a6fed7 100644 --- a/go/weed/version.go +++ b/go/weed/version.go @@ -1,9 +1,10 @@ package main import ( - "github.com/chrislusf/weed-fs/go/util" "fmt" "runtime" + + "github.com/chrislusf/weed-fs/go/util" ) var cmdVersion = &Command{ diff --git a/go/weed/volume.go b/go/weed/volume.go index 17d03f0c5..1683e1927 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -1,15 +1,16 @@ package main import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/util" - "github.com/chrislusf/weed-fs/go/weed/weed_server" "net/http" "os" "runtime" "strconv" "strings" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/weed/weed_server" ) func init() { @@ -26,11 +27,12 @@ var cmdVolume = &Command{ var ( vport = cmdVolume.Flag.Int("port", 8080, "http listen port") + volumeSecurePort = cmdVolume.Flag.Int("port.secure", 8443, "https listen port, active when SSL certs are specified. Not ready yet.") volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") ip = cmdVolume.Flag.String("ip", "", "ip or server name") publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>") - bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + volumeBindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") @@ -69,6 +71,7 @@ func runVolume(cmd *Command, args []string) bool { if *publicIp == "" { if *ip == "" { + *ip = "127.0.0.1" *publicIp = "localhost" } else { *publicIp = *ip @@ -81,11 +84,12 @@ func runVolume(cmd *Command, args []string) bool { r := http.NewServeMux() volumeServer := weed_server.NewVolumeServer(r, *ip, *vport, *publicIp, folders, maxCounts, - *masterNode, *vpulse, *dataCenter, *rack, volumeWhiteList, + *masterNode, *vpulse, *dataCenter, *rack, + volumeWhiteList, *fixJpgOrientation, ) - listeningAddress := *bindIp + ":" + strconv.Itoa(*vport) + listeningAddress := *volumeBindIp + ":" + strconv.Itoa(*vport) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress) diff --git a/go/weed/volume_test.go b/go/weed/volume_test.go index 764362a2b..ef00a8c7c 100644 --- a/go/weed/volume_test.go +++ b/go/weed/volume_test.go @@ -1,10 +1,11 @@ package main import ( - "github.com/chrislusf/weed-fs/go/glog" "net/http" "testing" "time" + + "github.com/chrislusf/weed-fs/go/glog" ) func TestXYZ(t *testing.T) { diff --git a/go/weed/weed.go b/go/weed/weed.go index c1f5a72de..c304b7f35 100644 --- a/go/weed/weed.go +++ b/go/weed/weed.go @@ -1,7 +1,6 @@ package main import ( - "github.com/chrislusf/weed-fs/go/glog" "flag" "fmt" "io" @@ -13,6 +12,8 @@ import ( "time" "unicode" "unicode/utf8" + + "github.com/chrislusf/weed-fs/go/glog" ) var IsDebug *bool diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 816107dc5..44ffcce47 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -2,18 +2,19 @@ package weed_server import ( "bytes" - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/stats" - "github.com/chrislusf/weed-fs/go/storage" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" + "errors" "fmt" - "net" "net/http" "path/filepath" "strconv" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/stats" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) var serverStats *stats.ServerStats @@ -24,7 +25,7 @@ func init() { } -func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) { +func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { var bytes []byte if r.FormValue("pretty") != "" { bytes, err = json.MarshalIndent(obj, "", " ") @@ -37,9 +38,11 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err err callback := r.FormValue("callback") if callback == "" { w.Header().Set("Content-Type", "application/json") + w.WriteHeader(httpStatus) _, err = w.Write(bytes) } else { w.Header().Set("Content-Type", "application/javascript") + w.WriteHeader(httpStatus) if _, err = w.Write([]uint8(callback)); err != nil { return } @@ -51,64 +54,45 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err err return } } + return } // wrapper for writeJson - just logs errors -func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) { - if err := writeJson(w, r, obj); err != nil { +func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) { + if err := writeJson(w, r, httpStatus, obj); err != nil { glog.V(0).Infof("error writing JSON %s: %s", obj, err.Error()) } } -func writeJsonError(w http.ResponseWriter, r *http.Request, err error) { - w.WriteHeader(http.StatusInternalServerError) +func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) { m := make(map[string]interface{}) m["error"] = err.Error() - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, httpStatus, m) } func debug(params ...interface{}) { glog.V(4).Infoln(params) } -func secure(whiteList []string, f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - if len(whiteList) == 0 { - f(w, r) - return - } - host, _, err := net.SplitHostPort(r.RemoteAddr) - if err == nil { - for _, ip := range whiteList { - if ip == host { - f(w, r) - return - } - } - } - writeJsonQuiet(w, r, map[string]interface{}{"error": "No write permisson from " + host}) - } -} - func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { m := make(map[string]interface{}) if r.Method != "POST" { - m["error"] = "Only submit via POST!" - writeJsonQuiet(w, r, m) + writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) return } debug("parsing upload file...") fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r) if pe != nil { - writeJsonError(w, r, pe) + writeJsonError(w, r, http.StatusBadRequest, pe) return } debug("assigning file id for", fname) + r.ParseForm() assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl")) if ae != nil { - writeJsonError(w, r, ae) + writeJsonError(w, r, http.StatusInternalServerError, ae) return } @@ -120,7 +104,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st debug("upload file to store", url) uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType) if err != nil { - writeJsonError(w, r, err) + writeJsonError(w, r, http.StatusInternalServerError, err) return } @@ -128,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st m["fid"] = assignResult.Fid m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid m["size"] = uploadResult.Size - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusCreated, m) return } @@ -137,10 +121,10 @@ func deleteForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st fids := r.Form["fid"] ret, err := operation.DeleteFiles(masterUrl, fids) if err != nil { - writeJsonError(w, r, err) + writeJsonError(w, r, http.StatusInternalServerError, err) return } - writeJsonQuiet(w, r, ret) + writeJsonQuiet(w, r, http.StatusAccepted, ret) } func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { @@ -180,12 +164,12 @@ func statsCounterHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION m["Counters"] = serverStats - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusOK, m) } func statsMemoryHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION m["Memory"] = stats.MemStat() - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusOK, m) } diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go index 5ff0ed986..18a02b5e0 100644 --- a/go/weed/weed_server/filer_server.go +++ b/go/weed/weed_server/filer_server.go @@ -1,32 +1,57 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/filer" - "github.com/chrislusf/weed-fs/go/glog" "net/http" "strconv" + + "github.com/chrislusf/weed-fs/go/filer" + "github.com/chrislusf/weed-fs/go/filer/cassandra_store" + "github.com/chrislusf/weed-fs/go/filer/embedded_filer" + "github.com/chrislusf/weed-fs/go/filer/flat_namespace" + "github.com/chrislusf/weed-fs/go/filer/redis_store" + "github.com/chrislusf/weed-fs/go/glog" ) type FilerServer struct { - port string - master string - collection string - filer filer.Filer + port string + master string + collection string + defaultReplication string + redirectOnRead bool + filer filer.Filer } -func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string) (fs *FilerServer, err error) { +func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string, + replication string, redirectOnRead bool, + cassandra_server string, cassandra_keyspace string, + redis_server string, redis_database int, +) (fs *FilerServer, err error) { fs = &FilerServer{ - master: master, - collection: collection, - port: ":" + strconv.Itoa(port), + master: master, + collection: collection, + defaultReplication: replication, + redirectOnRead: redirectOnRead, + port: ":" + strconv.Itoa(port), } - if fs.filer, err = filer.NewFilerEmbedded(master, dir); err != nil { - glog.Fatal("Can not start filer in dir", dir, ": ", err.Error()) - return + if cassandra_server != "" { + cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) + if err != nil { + glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) + } + fs.filer = flat_namespace.NewFlatNamesapceFiler(master, cassandra_store) + } else if redis_server != "" { + redis_store := redis_store.NewRedisStore(redis_server, redis_database) + fs.filer = flat_namespace.NewFlatNamesapceFiler(master, redis_store) + } else { + if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { + glog.Fatalf("Can not start filer in dir %s : %v", err) + return + } + + r.HandleFunc("/admin/mv", fs.moveHandler) } - r.HandleFunc("/admin/mv", fs.moveHandler) r.HandleFunc("/", fs.filerHandler) return fs, nil diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index e36e7c310..0811b7973 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -1,12 +1,8 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" - "github.com/syndtr/goleveldb/leveldb" "io" "io/ioutil" "math/rand" @@ -14,6 +10,11 @@ import ( "net/url" "strconv" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/util" + "github.com/syndtr/goleveldb/leveldb" ) func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { @@ -49,7 +50,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque limit = 100 } m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit) - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusOK, m) } func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { if strings.HasSuffix(r.URL.Path, "/") { @@ -80,7 +81,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl - u, _ := url.Parse("http://" + urlLocation + "/" + fileId) + urlString := "http://" + urlLocation + "/" + fileId + if fs.redirectOnRead { + http.Redirect(w, r, urlString, http.StatusFound) + return + } + u, _ := url.Parse(urlString) request := &http.Request{ Method: r.Method, URL: u, @@ -96,7 +102,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, resp, do_err := util.Do(request) if do_err != nil { glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) - writeJsonError(w, r, do_err) + writeJsonError(w, r, http.StatusInternalServerError, do_err) return } defer resp.Body.Close() @@ -109,10 +115,14 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() - assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl")) + replication := query.Get("replication") + if replication == "" { + replication = fs.defaultReplication + } + assignResult, ae := operation.Assign(fs.master, 1, replication, fs.collection, query.Get("ttl")) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) - writeJsonError(w, r, ae) + writeJsonError(w, r, http.StatusInternalServerError, ae) return } @@ -132,14 +142,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { resp, do_err := util.Do(request) if do_err != nil { glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) - writeJsonError(w, r, do_err) + writeJsonError(w, r, http.StatusInternalServerError, do_err) return } defer resp.Body.Close() resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) - writeJsonError(w, r, ra_err) + writeJsonError(w, r, http.StatusInternalServerError, ra_err) return } glog.V(4).Infoln("post result", string(resp_body)) @@ -147,12 +157,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) - writeJsonError(w, r, unmarshal_err) + writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err) return } if ret.Error != "" { glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, errors.New(ret.Error)) + writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) return } path := r.URL.Path @@ -162,18 +172,20 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } else { operation.DeleteFile(fs.master, assignResult.Fid) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, errors.New("Can not to write to folder "+path+" without a file name")) + writeJsonError(w, r, http.StatusInternalServerError, + errors.New("Can not to write to folder "+path+" without a file name")) return } } glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { operation.DeleteFile(fs.master, assignResult.Fid) //clean up - glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error()) - writeJsonError(w, r, db_err) + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + writeJsonError(w, r, http.StatusInternalServerError, db_err) return } w.WriteHeader(http.StatusCreated) + w.Write(resp_body) } // curl -X DELETE http://localhost:8888/path/to @@ -191,10 +203,9 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } } if err == nil { - w.WriteHeader(http.StatusAccepted) - writeJsonQuiet(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error()) - writeJsonError(w, r, err) + writeJsonError(w, r, http.StatusInternalServerError, err) } } diff --git a/go/weed/weed_server/filer_server_handlers_admin.go b/go/weed/weed_server/filer_server_handlers_admin.go index ff52dff24..3f67b30f1 100644 --- a/go/weed/weed_server/filer_server_handlers_admin.go +++ b/go/weed/weed_server/filer_server_handlers_admin.go @@ -1,8 +1,9 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/glog" "net/http" + + "github.com/chrislusf/weed-fs/go/glog" ) /* @@ -21,7 +22,7 @@ func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) { err := fs.filer.Move(from, to) if err != nil { glog.V(4).Infoln("moving", from, "->", to, err.Error()) - writeJsonError(w, r, err) + writeJsonError(w, r, http.StatusInternalServerError, err) } else { w.WriteHeader(http.StatusOK) } diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 401f6cfdb..056b1fe7b 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -1,16 +1,19 @@ package weed_server import ( + "fmt" + "net/http" + "net/http/httputil" + "net/url" + "sync" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/security" "github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/topology" "github.com/chrislusf/weed-fs/go/util" "github.com/goraft/raft" "github.com/gorilla/mux" - "net/http" - "net/http/httputil" - "net/url" - "sync" ) type MasterServer struct { @@ -20,7 +23,6 @@ type MasterServer struct { pulseSeconds int defaultReplicaPlacement string garbageThreshold string - whiteList []string Topo *topology.Topology vg *topology.VolumeGrowth @@ -36,6 +38,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, defaultReplicaPlacement string, garbageThreshold string, whiteList []string, + secureKey string, ) *MasterServer { ms := &MasterServer{ port: port, @@ -43,7 +46,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, pulseSeconds: pulseSeconds, defaultReplicaPlacement: defaultReplicaPlacement, garbageThreshold: garbageThreshold, - whiteList: whiteList, } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() @@ -55,20 +57,22 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler))) - r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(secure(ms.whiteList, ms.collectionDeleteHandler))) - r.HandleFunc("/vol/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.volumeLookupHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler))) - r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler)) - r.HandleFunc("/delete", secure(ms.whiteList, ms.deleteFromMasterServerHandler)) + guard := security.NewGuard(whiteList, secureKey) + + r.HandleFunc("/dir/assign", ms.proxyToLeader(guard.Secure(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(guard.Secure(ms.dirLookupHandler))) + r.HandleFunc("/dir/join", ms.proxyToLeader(guard.Secure(ms.dirJoinHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(guard.Secure(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(guard.Secure(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/lookup", ms.proxyToLeader(guard.Secure(ms.volumeLookupHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(guard.Secure(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(guard.Secure(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(guard.Secure(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", guard.Secure(ms.submitFromMasterServerHandler)) + r.HandleFunc("/delete", guard.Secure(ms.deleteFromMasterServerHandler)) r.HandleFunc("/{fileId}", ms.redirectHandler) - r.HandleFunc("/stats/counter", secure(ms.whiteList, statsCounterHandler)) - r.HandleFunc("/stats/memory", secure(ms.whiteList, statsMemoryHandler)) + r.HandleFunc("/stats/counter", guard.Secure(statsCounterHandler)) + r.HandleFunc("/stats/memory", guard.Secure(statsMemoryHandler)) ms.Topo.StartRefreshWritableVolumes(garbageThreshold) @@ -100,7 +104,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ defer func() { <-ms.bounedLeaderChan }() targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) if err != nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL http://" + ms.Topo.RaftServer.Leader() + " Parse Error " + err.Error()}) + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) return } glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 93e9e7d9a..7b6ce58f3 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -1,12 +1,14 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/stats" - "github.com/chrislusf/weed-fs/go/storage" + "fmt" "net/http" "strconv" "strings" + + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/stats" + "github.com/chrislusf/weed-fs/go/storage" ) func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { @@ -49,10 +51,11 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) collection := r.FormValue("collection") //optional, but can be faster if too many collections volumeLocations := ms.lookupVolumeId(vids, collection) location := volumeLocations[vid] + httpStatus := http.StatusOK if location.Error != "" { - w.WriteHeader(http.StatusNotFound) + httpStatus = http.StatusNotFound } - writeJsonQuiet(w, r, location) + writeJsonQuiet(w, r, httpStatus, location) } // This can take batched volumeIds, &volumeId=x&volumeId=y&volumeId=z @@ -61,7 +64,7 @@ func (ms *MasterServer) volumeLookupHandler(w http.ResponseWriter, r *http.Reque vids := r.Form["volumeId"] collection := r.FormValue("collection") //optional, but can be faster if too many collections volumeLocations := ms.lookupVolumeId(vids, collection) - writeJsonQuiet(w, r, volumeLocations) + writeJsonQuiet(w, r, http.StatusOK, volumeLocations) } func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { @@ -73,22 +76,21 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) option, err := ms.getVolumeGrowOption(r) if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, operation.AssignResult{Error: err.Error()}) + writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) return } - if !ms.Topo.HasWriableVolume(option) { + if !ms.Topo.HasWritableVolume(option) { if ms.Topo.FreeSpace() <= 0 { - w.WriteHeader(http.StatusNotFound) - writeJsonQuiet(w, r, operation.AssignResult{Error: "No free volumes left!"}) + writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"}) return } else { ms.vgLock.Lock() defer ms.vgLock.Unlock() - if !ms.Topo.HasWriableVolume(option) { + if !ms.Topo.HasWritableVolume(option) { if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { - writeJsonQuiet(w, r, operation.AssignResult{Error: "Cannot grow volume group! " + err.Error()}) + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Cannot grow volume group! %v", err)) return } } @@ -96,9 +98,8 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) if err == nil { - writeJsonQuiet(w, r, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) + writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, operation.AssignResult{Error: err.Error()}) + writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) } } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index c9a8020c2..437044eee 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -1,30 +1,32 @@ package weed_server import ( - proto "code.google.com/p/goprotobuf/proto" - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/storage" - "github.com/chrislusf/weed-fs/go/topology" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" + "fmt" "io/ioutil" "net/http" "strconv" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/topology" + "github.com/chrislusf/weed-fs/go/util" + "github.com/golang/protobuf/proto" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { collection, ok := ms.Topo.GetCollection(r.FormValue("collection")) if !ok { - writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"}) + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist!", r.FormValue("collection"))) return } for _, server := range collection.ListVolumeServers() { _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) if err != nil { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusInternalServerError, err) return } } @@ -34,12 +36,12 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { - writeJsonError(w, r, err) + writeJsonError(w, r, http.StatusBadRequest, err) return } joinMessage := &operation.JoinMessage{} if err = proto.Unmarshal(body, joinMessage); err != nil { - writeJsonError(w, r, err) + writeJsonError(w, r, http.StatusBadRequest, err) return } if *joinMessage.Ip == "" { @@ -48,7 +50,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { if glog.V(4) { if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil { glog.V(0).Infoln("json marshaling error: ", jsonError) - writeJsonError(w, r, jsonError) + writeJsonError(w, r, http.StatusBadRequest, jsonError) return } else { glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData)) @@ -56,14 +58,14 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } ms.Topo.ProcessJoinMessage(joinMessage) - writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) + writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) } func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION m["Topology"] = ms.Topo.ToMap() - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusOK, m) } func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { @@ -80,8 +82,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request count := 0 option, err := ms.getVolumeGrowOption(r) if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusNotAcceptable, err) return } if err == nil { @@ -96,11 +97,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request } } if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusNotAcceptable, err) } else { - w.WriteHeader(http.StatusOK) - writeJsonQuiet(w, r, map[string]interface{}{"count": count}) + writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count}) } } @@ -108,7 +107,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque m := make(map[string]interface{}) m["Version"] = util.VERSION m["Volumes"] = ms.Topo.ToVolumeMap() - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusOK, m) } func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { @@ -122,8 +121,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) if machines != nil && len(machines) > 0 { http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) } else { - w.WriteHeader(http.StatusNotFound) - writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found.", volumeId)) } } @@ -143,7 +141,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r * } } -func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { +func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go index e41867076..b9aaef2b0 100644 --- a/go/weed/weed_server/raft_server.go +++ b/go/weed/weed_server/raft_server.go @@ -2,19 +2,22 @@ package weed_server import ( "bytes" - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/topology" "encoding/json" "errors" "fmt" - "github.com/goraft/raft" - "github.com/gorilla/mux" "io/ioutil" "math/rand" "net/http" "net/url" + "os" + "path" "strings" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/topology" + "github.com/goraft/raft" + "github.com/gorilla/mux" ) type RaftServer struct { @@ -44,6 +47,14 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin var err error transporter := raft.NewHTTPTransporter("/cluster", 0) transporter.Transport.MaxIdleConnsPerHost = 1024 + glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr) + + // Clear old cluster configurations if peers are set + if len(s.peers) > 0 { + os.RemoveAll(path.Join(s.dataDir, "conf")) + os.RemoveAll(path.Join(s.dataDir, "log")) + os.RemoveAll(path.Join(s.dataDir, "snapshot")) + } s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "") if err != nil { @@ -52,35 +63,30 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin } transporter.Install(s.raftServer, s) s.raftServer.SetHeartbeatInterval(1 * time.Second) - s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 1150 * time.Millisecond) + s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 3450 * time.Millisecond) s.raftServer.Start() s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST") s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") - // Join to leader if specified. if len(s.peers) > 0 { - if !s.raftServer.IsLogEmpty() { - glog.V(0).Infoln("Starting cluster with existing logs.") - } else { - glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - firstJoinError := s.Join(s.peers) - if firstJoinError != nil { - glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - }) - if err != nil { - glog.V(0).Infoln(err) - return nil - } + // Join to leader if specified. + glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + firstJoinError := s.Join(s.peers) + if firstJoinError != nil { + glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") + _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: "http://" + s.httpAddr, + }) + if err != nil { + glog.V(0).Infoln(err) + return nil } } - - // Initialize the server by joining itself. } else if s.raftServer.IsLogEmpty() { + // Initialize the server by joining itself. glog.V(0).Infoln("Initializing new cluster") _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ @@ -94,7 +100,7 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin } } else { - glog.V(0).Infoln("Recovered from log") + glog.V(0).Infoln("Old conf,log,snapshot should have been removed.") } return s diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go index 4d51c0767..629de248b 100644 --- a/go/weed/weed_server/raft_server_handlers.go +++ b/go/weed/weed_server/raft_server_handlers.go @@ -1,13 +1,14 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/operation" "encoding/json" - "github.com/goraft/raft" "io/ioutil" "net/http" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/goraft/raft" ) // Handles incoming RAFT joins. @@ -59,5 +60,5 @@ func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { if leader, e := s.topo.Leader(); e == nil { ret.Leader = leader } - writeJsonQuiet(w, r, ret) + writeJsonQuiet(w, r, http.StatusOK, ret) } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 2a9085f3b..9ceeb0149 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -1,12 +1,14 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "math/rand" "net/http" "strconv" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/security" + "github.com/chrislusf/weed-fs/go/storage" ) type VolumeServer struct { @@ -14,8 +16,8 @@ type VolumeServer struct { pulseSeconds int dataCenter string rack string - whiteList []string store *storage.Store + guard *security.Guard FixJpgOrientation bool } @@ -23,29 +25,31 @@ type VolumeServer struct { func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int, masterNode string, pulseSeconds int, dataCenter string, rack string, - whiteList []string, fixJpgOrientation bool) *VolumeServer { + whiteList []string, + fixJpgOrientation bool) *VolumeServer { publicUrl := publicIp + ":" + strconv.Itoa(port) vs := &VolumeServer{ masterNode: masterNode, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, - whiteList: whiteList, FixJpgOrientation: fixJpgOrientation, } vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts) - r.HandleFunc("/status", secure(vs.whiteList, vs.statusHandler)) - r.HandleFunc("/admin/assign_volume", secure(vs.whiteList, vs.assignVolumeHandler)) - r.HandleFunc("/admin/vacuum_volume_check", secure(vs.whiteList, vs.vacuumVolumeCheckHandler)) - r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler)) - r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler)) - r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler)) - r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler)) - r.HandleFunc("/stats/counter", secure(vs.whiteList, statsCounterHandler)) - r.HandleFunc("/stats/memory", secure(vs.whiteList, statsMemoryHandler)) - r.HandleFunc("/stats/disk", secure(vs.whiteList, vs.statsDiskHandler)) - r.HandleFunc("/delete", secure(vs.whiteList, vs.batchDeleteHandler)) + vs.guard = security.NewGuard(whiteList, "") + + r.HandleFunc("/status", vs.guard.Secure(vs.statusHandler)) + r.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler)) + r.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler)) + r.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler)) + r.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler)) + r.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler)) + r.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler)) + r.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler)) + r.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler)) + r.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler)) + r.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler)) r.HandleFunc("/", vs.storeHandler) go func() { diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go index ce14f6a87..6b47ee84d 100644 --- a/go/weed/weed_server/volume_server_handlers.go +++ b/go/weed/weed_server/volume_server_handlers.go @@ -1,12 +1,7 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/images" - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/stats" - "github.com/chrislusf/weed-fs/go/storage" - "github.com/chrislusf/weed-fs/go/topology" + "errors" "io" "mime" "mime/multipart" @@ -14,6 +9,13 @@ import ( "strconv" "strings" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/images" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/stats" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/topology" ) var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") @@ -28,13 +30,13 @@ func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) { vs.GetOrHeadHandler(w, r) case "DELETE": stats.DeleteRequest() - secure(vs.whiteList, vs.DeleteHandler)(w, r) + vs.guard.Secure(vs.DeleteHandler)(w, r) case "PUT": stats.WriteRequest() - secure(vs.whiteList, vs.PostHandler)(w, r) + vs.guard.Secure(vs.PostHandler)(w, r) case "POST": stats.WriteRequest() - secure(vs.whiteList, vs.PostHandler)(w, r) + vs.guard.Secure(vs.PostHandler)(w, r) } } @@ -234,35 +236,34 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { if e := r.ParseForm(); e != nil { glog.V(0).Infoln("form parse error:", e) - writeJsonError(w, r, e) + writeJsonError(w, r, http.StatusBadRequest, e) return } vid, _, _, _, _ := parseURLPath(r.URL.Path) volumeId, ve := storage.NewVolumeId(vid) if ve != nil { glog.V(0).Infoln("NewVolumeId error:", ve) - writeJsonError(w, r, ve) + writeJsonError(w, r, http.StatusBadRequest, ve) return } needle, ne := storage.NewNeedle(r, vs.FixJpgOrientation) if ne != nil { - writeJsonError(w, r, ne) + writeJsonError(w, r, http.StatusBadRequest, ne) return } ret := operation.UploadResult{} size, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r) - if errorStatus == "" { - w.WriteHeader(http.StatusCreated) - } else { - w.WriteHeader(http.StatusInternalServerError) + httpStatus := http.StatusCreated + if errorStatus != "" { + httpStatus = http.StatusInternalServerError ret.Error = errorStatus } if needle.HasName() { ret.Name = string(needle.Name) } ret.Size = size - writeJsonQuiet(w, r, ret) + writeJsonQuiet(w, r, httpStatus, ret) } func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -279,7 +280,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { if ok != nil { m := make(map[string]uint32) m["size"] = 0 - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusNotFound, m) return } @@ -292,14 +293,13 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r) if ret != 0 { - w.WriteHeader(http.StatusAccepted) + m := make(map[string]uint32) + m["size"] = uint32(count) + writeJsonQuiet(w, r, http.StatusAccepted, m) } else { - w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, http.StatusInternalServerError, errors.New("Deletion Failed.")) } - m := make(map[string]uint32) - m["size"] = uint32(count) - writeJsonQuiet(w, r, m) } //Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas. @@ -333,7 +333,5 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques } } - w.WriteHeader(http.StatusAccepted) - - writeJsonQuiet(w, r, ret) + writeJsonQuiet(w, r, http.StatusAccepted, ret) } diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 1118c8017..1581a5770 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -1,26 +1,27 @@ package weed_server import ( + "net/http" + "path/filepath" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/stats" "github.com/chrislusf/weed-fs/go/util" - "net/http" - "path/filepath" ) func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION m["Volumes"] = vs.store.Status() - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusOK, m) } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl")) if err == nil { - writeJsonQuiet(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusNotAcceptable, err) } glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err) } @@ -32,9 +33,9 @@ func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.R } err := vs.store.DeleteCollection(r.FormValue("collection")) if err == nil { - writeJsonQuiet(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusInternalServerError, err) } glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) } @@ -43,9 +44,9 @@ func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Reque //TODO: notify master that this volume will be read-only err := vs.store.FreezeVolume(r.FormValue("volume")) if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusInternalServerError, err) } glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err) } @@ -60,5 +61,5 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) } } m["DiskStatues"] = ds - writeJsonQuiet(w, r, m) + writeJsonQuiet(w, r, http.StatusOK, m) } diff --git a/go/weed/weed_server/volume_server_handlers_vacuum.go b/go/weed/weed_server/volume_server_handlers_vacuum.go index b0600d799..cb30e10b4 100644 --- a/go/weed/weed_server/volume_server_handlers_vacuum.go +++ b/go/weed/weed_server/volume_server_handlers_vacuum.go @@ -1,34 +1,35 @@ package weed_server import ( - "github.com/chrislusf/weed-fs/go/glog" "net/http" + + "github.com/chrislusf/weed-fs/go/glog" ) func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret}) + writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"error": "", "result": ret}) } else { - writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false}) + writeJsonQuiet(w, r, http.StatusInternalServerError, map[string]interface{}{"error": err.Error(), "result": false}) } glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) } func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { err := vs.store.CompactVolume(r.FormValue("volume")) if err == nil { - writeJsonQuiet(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusInternalServerError, err) } glog.V(2).Infoln("compacted volume =", r.FormValue("volume"), ", error =", err) } func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { err := vs.store.CommitCompactVolume(r.FormValue("volume")) if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + writeJsonError(w, r, http.StatusInternalServerError, err) } glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err) } |
