aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'weed-fs/src/pkg')
-rw-r--r--weed-fs/src/pkg/directory/volume_mapping.go48
1 files changed, 42 insertions, 6 deletions
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go
index ff5b0bf64..fa2ae62d3 100644
--- a/weed-fs/src/pkg/directory/volume_mapping.go
+++ b/weed-fs/src/pkg/directory/volume_mapping.go
@@ -11,7 +11,8 @@ import (
)
const (
- ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8
+ ChunkSizeLimit = 1 * 1000 * 1000 * 1000 //1G, can not be more than max(uint32)*8
+ FileIdSaveInterval = 10000
)
type MachineInfo struct {
@@ -35,14 +36,17 @@ type Mapper struct {
Writers []int // transient array of Writers volume id
GlobalVolumeSequence uint64
+
+ FileIdSequence uint64
+ fileIdCounter uint64
}
func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) *Machine {
- return &Machine{Server:MachineInfo{Url:server,PublicUrl:publicServer},Volumes:volumes,Capacity:capacity}
+ return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicServer}, Volumes: volumes, Capacity: capacity}
}
func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
- m = &Mapper{dir:dirname,fileName:filename,capacity:capacity}
+ m = &Mapper{dir: dirname, fileName: filename, capacity: capacity}
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map"))
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644)
m.vid2machineId = make(map[uint64]int)
@@ -67,11 +71,33 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
}
log.Println("Loaded mapping size", len(m.Machines))
}
+
+ seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
+ if se != nil {
+ m.FileIdSequence = FileIdSaveInterval
+ log.Println("Setting file id sequence", m.FileIdSequence)
+ } else {
+ decoder := gob.NewDecoder(seqFile)
+ defer seqFile.Close()
+ decoder.Decode(&m.FileIdSequence)
+ log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence + FileIdSaveInterval)
+ //in case the server stops between intervals
+ m.FileIdSequence += FileIdSaveInterval
+ }
return
}
-func (m *Mapper) PickForWrite() MachineInfo {
- vid := rand.Intn(len(m.Writers))
- return m.Machines[m.Writers[vid]].Server
+func (m *Mapper) PickForWrite() (vid uint64, server MachineInfo) {
+ machine := m.Machines[m.Writers[rand.Intn(len(m.Writers))]]
+ vid = machine.Volumes[rand.Intn(len(machine.Volumes))].Id
+ return vid, machine.Server
+}
+func (m *Mapper) NextFileId() uint64 {
+ if m.fileIdCounter <= 0 {
+ m.fileIdCounter = FileIdSaveInterval
+ m.saveSequence()
+ }
+ m.fileIdCounter--
+ return m.FileIdSequence - m.fileIdCounter
}
func (m *Mapper) Get(vid uint64) *Machine {
return m.Machines[m.vid2machineId[vid]]
@@ -141,3 +167,13 @@ func (m *Mapper) Save() {
encoder.Encode(m.Machines)
encoder.Encode(m.GlobalVolumeSequence)
}
+func (m *Mapper) saveSequence() {
+ log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
+ seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
+ if e != nil {
+ log.Fatalf("Sequence File Save [ERROR] %s\n", e)
+ }
+ defer seqFile.Close()
+ encoder := gob.NewEncoder(seqFile)
+ encoder.Encode(m.FileIdSequence)
+}