aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-07-13 11:38:01 -0700
committerChris Lu <chris.lu@gmail.com>2013-07-13 11:38:01 -0700
commitd4105f9b46283ac37276b5413b5c53d5f9e42ad1 (patch)
tree00cead464852103fcc70f18ff9cdb6a1ae668e90 /go
parent175456870a7b9e92b2447da70eb875599d643e8e (diff)
downloadseaweedfs-d4105f9b46283ac37276b5413b5c53d5f9e42ad1.tar.xz
seaweedfs-d4105f9b46283ac37276b5413b5c53d5f9e42ad1.zip
add support for multiple folders and multiple max limit: eg
-dir=folder1,folder2,folder3 -max=7,8,9
Diffstat (limited to 'go')
-rw-r--r--go/storage/store.go160
-rw-r--r--go/weed/volume.go54
2 files changed, 139 insertions, 75 deletions
diff --git a/go/storage/store.go b/go/storage/store.go
index c8d1ca856..0238bbbbd 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -11,28 +11,32 @@ import (
"strings"
)
-type Store struct {
+type DiskLocation struct {
+ directory string
+ maxVolumeCount int
volumes map[VolumeId]*Volume
- dir string
- Port int
- Ip string
- PublicUrl string
- MaxVolumeCount int
-
+}
+type Store struct {
+ Port int
+ Ip string
+ PublicUrl string
+ locations []*DiskLocation
masterNode string
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
volumeSizeLimit uint64 //read from the master
-
}
-func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
- s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount}
- s.volumes = make(map[VolumeId]*Volume)
- s.loadExistingVolumes()
-
- log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
+func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
+ s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
+ s.locations = make([]*DiskLocation, 0)
+ for i := 0; i < len(dirnames); i++ {
+ location := &DiskLocation{directory: dirnames[i], maxVolumeCount: maxVolumeCounts[i]}
+ location.volumes = make(map[VolumeId]*Volume)
+ location.loadExistingVolumes()
+ s.locations = append(s.locations, location)
+ }
return
}
func (s *Store) AddVolume(volumeListString string, replicationType string) error {
@@ -56,7 +60,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
}
end, end_err := strconv.ParseUint(pair[1], 10, 64)
if end_err != nil {
- return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1] )
+ return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), rt); err != nil {
@@ -67,13 +71,35 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
}
return e
}
+func (s *Store) findVolume(vid VolumeId) *Volume {
+ for _, location := range s.locations {
+ if v, found := location.volumes[vid]; found {
+ return v
+ }
+ }
+ return nil
+}
+func (s *Store) findFreeLocation() (ret *DiskLocation) {
+ max := 0
+ for _, location := range s.locations {
+ currentFreeCount := location.maxVolumeCount - len(location.volumes)
+ if currentFreeCount > max {
+ max = currentFreeCount
+ ret = location
+ }
+ }
+ return ret
+}
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
- if s.volumes[vid] != nil {
+ if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %s already exists!", vid)
}
- log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
- s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
- return err
+ if location := s.findFreeLocation(); location != nil {
+ log.Println("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType)
+ location.volumes[vid], err = NewVolume(location.directory, vid, replicationType)
+ return err
+ }
+ return fmt.Errorf("No more free space left")
}
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
@@ -85,60 +111,76 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString
if e != nil {
return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false
}
- return nil, garbageThreshold < s.volumes[vid].garbageLevel()
+ if v := s.findVolume(vid); v != nil {
+ return nil, garbageThreshold < v.garbageLevel()
+ }
+ return fmt.Errorf("volume id %s is not found during check compact!", vid), false
}
func (s *Store) CompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
}
- return s.volumes[vid].compact()
+ if v := s.findVolume(vid); v != nil {
+ return v.compact()
+ }
+ return fmt.Errorf("volume id %s is not found during compact!", vid)
}
func (s *Store) CommitCompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
}
- return s.volumes[vid].commitCompact()
+ if v := s.findVolume(vid); v != nil {
+ return v.commitCompact()
+ }
+ return fmt.Errorf("volume id %s is not found during commit compact!", vid)
}
func (s *Store) FreezeVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
}
- if s.volumes[vid].readOnly {
- return fmt.Errorf("Volume %s is already read-only", volumeIdString)
+ if v := s.findVolume(vid); v != nil {
+ if v.readOnly {
+ return fmt.Errorf("Volume %s is already read-only", volumeIdString)
+ }
+ return v.freeze()
+ } else {
+ return fmt.Errorf("volume id %s is not found during freeze!", vid)
}
- return s.volumes[vid].freeze()
}
-func (s *Store) loadExistingVolumes() {
- if dirs, err := ioutil.ReadDir(s.dir); err == nil {
+func (l *DiskLocation) loadExistingVolumes() {
+ if dirs, err := ioutil.ReadDir(l.directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
base := name[:len(name)-len(".dat")]
if vid, err := NewVolumeId(base); err == nil {
- if s.volumes[vid] == nil {
- if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
- s.volumes[vid] = v
- log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
+ if l.volumes[vid] == nil {
+ if v, e := NewVolume(l.directory, vid, CopyNil); e == nil {
+ l.volumes[vid] = v
+ log.Println("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
}
}
}
}
}
}
+ log.Println("Store started on dir:", l.directory, "with", len(l.volumes), "volumes", "max", l.maxVolumeCount)
}
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
- for k, v := range s.volumes {
- s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
- RepType: v.ReplicaType, Version: v.Version(),
- FileCount: v.nm.FileCount(),
- DeleteCount: v.nm.DeletedCount(),
- DeletedByteCount: v.nm.DeletedSize(),
- ReadOnly: v.readOnly}
- stats = append(stats, s)
+ for _, location := range s.locations {
+ for k, v := range location.volumes {
+ s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
+ RepType: v.ReplicaType, Version: v.Version(),
+ FileCount: v.nm.FileCount(),
+ DeleteCount: v.nm.DeletedCount(),
+ DeletedByteCount: v.nm.DeletedSize(),
+ ReadOnly: v.readOnly}
+ stats = append(stats, s)
+ }
}
return stats
}
@@ -158,14 +200,18 @@ func (s *Store) SetRack(rack string) {
}
func (s *Store) Join() error {
stats := new([]*VolumeInfo)
- for k, v := range s.volumes {
- s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
- RepType: v.ReplicaType, Version: v.Version(),
- FileCount: v.nm.FileCount(),
- DeleteCount: v.nm.DeletedCount(),
- DeletedByteCount: v.nm.DeletedSize(),
- ReadOnly: v.readOnly}
- *stats = append(*stats, s)
+ maxVolumeCount := 0
+ for _, location := range s.locations {
+ maxVolumeCount = maxVolumeCount + location.maxVolumeCount
+ for k, v := range location.volumes {
+ s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
+ RepType: v.ReplicaType, Version: v.Version(),
+ FileCount: v.nm.FileCount(),
+ DeleteCount: v.nm.DeletedCount(),
+ DeletedByteCount: v.nm.DeletedSize(),
+ ReadOnly: v.readOnly}
+ *stats = append(*stats, s)
+ }
}
bytes, _ := json.Marshal(stats)
values := make(url.Values)
@@ -176,7 +222,7 @@ func (s *Store) Join() error {
values.Add("ip", s.Ip)
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
- values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
+ values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
values.Add("dataCenter", s.dataCenter)
values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
@@ -192,12 +238,14 @@ func (s *Store) Join() error {
return nil
}
func (s *Store) Close() {
- for _, v := range s.volumes {
- v.Close()
+ for _, location := range s.locations {
+ for _, v := range location.volumes {
+ v.Close()
+ }
}
}
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
- if v := s.volumes[i]; v != nil {
+ if v := s.findVolume(i); v != nil {
if v.readOnly {
err = fmt.Errorf("Volume %s is read only!", i)
return
@@ -221,22 +269,22 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
return
}
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
- if v := s.volumes[i]; v != nil && !v.readOnly {
+ if v := s.findVolume(i); v != nil && !v.readOnly {
return v.delete(n)
}
return 0, nil
}
func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
- if v := s.volumes[i]; v != nil {
+ if v := s.findVolume(i); v != nil {
return v.read(n)
}
return 0, fmt.Errorf("Volume %s not found!", i)
}
func (s *Store) GetVolume(i VolumeId) *Volume {
- return s.volumes[i]
+ return s.findVolume(i)
}
func (s *Store) HasVolume(i VolumeId) bool {
- _, ok := s.volumes[i]
- return ok
+ v := s.findVolume(i)
+ return v != nil
}
diff --git a/go/weed/volume.go b/go/weed/volume.go
index 759200927..ecd547860 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -29,17 +29,17 @@ var cmdVolume = &Command{
}
var (
- vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
- volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files")
- ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
- publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
- masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
- vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
- maxVolumeCount = cmdVolume.Flag.Int("max", 7, "maximum number of volumes")
- vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
- vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
- rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
+ vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
+ volumeFolders = cmdVolume.Flag.String("dir", "/tmp", "directories to store data files. dir[,dir]...")
+ maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
+ ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
+ publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
+ masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
+ vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
+ vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
+ vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
+ rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
store *storage.Store
)
@@ -295,21 +295,37 @@ func runVolume(cmd *Command, args []string) bool {
*vMaxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*vMaxCpu)
- fileInfo, err := os.Stat(*volumeFolder)
- if err != nil {
- log.Fatalf("No Existing Folder:%s", *volumeFolder)
+ folders := strings.Split(*volumeFolders, ",")
+ maxCountStrings := strings.Split(*maxVolumeCounts, ",")
+ maxCounts := make([]int, 0)
+ for _, maxString := range maxCountStrings {
+ if max, e := strconv.Atoi(maxString); e == nil {
+ maxCounts = append(maxCounts, max)
+ } else {
+ log.Fatalf("The max specified in -max not a valid number %s", max)
+ }
}
- if !fileInfo.IsDir() {
- log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
+ if len(folders) != len(maxCounts) {
+ log.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts))
+ }
+ for _, folder := range folders {
+ fileInfo, err := os.Stat(folder)
+ if err != nil {
+ log.Fatalf("No Existing Folder:%s", folder)
+ }
+ if !fileInfo.IsDir() {
+ log.Fatalf("Volume Folder should not be a file:%s", folder)
+ }
+ perm := fileInfo.Mode().Perm()
+ log.Println("Volume Folder", folder)
+ log.Println("Permission:", perm)
}
- perm := fileInfo.Mode().Perm()
- log.Println("Volume Folder permission:", perm)
if *publicUrl == "" {
*publicUrl = *ip + ":" + strconv.Itoa(*vport)
}
- store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
+ store = storage.NewStore(*vport, *ip, *publicUrl, folders, maxCounts)
defer store.Close()
http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler)