aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2014-10-21 15:50:48 +0800
committeryourchanges <yourchanges@gmail.com>2014-10-21 15:50:48 +0800
commitf7bcd8e958ef185baeca0c455a397d49fcb62256 (patch)
treebb2a2de4fcb7f9ac7e1d65a78e82cbe1f5f17e36 /go
parent78ccbbf3d0766e1ececf91fa00766d1ed33050cc (diff)
parenta3c17f17b144c4298409fddda2d6bc4b39d38ca5 (diff)
downloadseaweedfs-f7bcd8e958ef185baeca0c455a397d49fcb62256.tar.xz
seaweedfs-f7bcd8e958ef185baeca0c455a397d49fcb62256.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'go')
-rw-r--r--go/filer/client_operations.go2
-rw-r--r--go/filer/directory_in_map.go33
-rw-r--r--go/filer/filer_embedded.go2
-rw-r--r--go/filer/files_in_leveldb.go2
-rw-r--r--go/images/orientation.go5
-rw-r--r--go/operation/assign_file_id.go9
-rw-r--r--go/operation/delete_content.go2
-rw-r--r--go/operation/list_masters.go4
-rw-r--r--go/operation/lookup.go2
-rw-r--r--go/operation/submit.go13
-rw-r--r--go/operation/system_message.pb.go12
-rw-r--r--go/operation/upload_content.go2
-rw-r--r--go/proto/system_message.proto1
-rw-r--r--go/storage/cdb_map.go2
-rw-r--r--go/storage/cdb_map_test.go2
-rw-r--r--go/storage/compact_map_perf_test.go4
-rw-r--r--go/storage/compress.go2
-rw-r--r--go/storage/crc.go4
-rw-r--r--go/storage/file_id.go4
-rw-r--r--go/storage/needle.go15
-rw-r--r--go/storage/needle_map.go4
-rw-r--r--go/storage/needle_read_write.go25
-rw-r--r--go/storage/replica_placement.go8
-rw-r--r--go/storage/store.go73
-rw-r--r--go/storage/store_vacuum.go2
-rw-r--r--go/storage/volume.go132
-rw-r--r--go/storage/volume_info.go4
-rw-r--r--go/storage/volume_super_block.go75
-rw-r--r--go/storage/volume_super_block_test.go23
-rw-r--r--go/storage/volume_ttl.go135
-rw-r--r--go/storage/volume_ttl_test.go60
-rw-r--r--go/storage/volume_vacuum.go16
-rw-r--r--go/tools/read_index.go2
-rw-r--r--go/topology/allocate_volume.go11
-rw-r--r--go/topology/cluster_commands.go4
-rw-r--r--go/topology/collection.go25
-rw-r--r--go/topology/data_node.go10
-rw-r--r--go/topology/node.go4
-rw-r--r--go/topology/store_replicate.go8
-rw-r--r--go/topology/topology.go29
-rw-r--r--go/topology/topology_event_handling.go10
-rw-r--r--go/topology/topology_map.go2
-rw-r--r--go/topology/topology_vacuum.go8
-rw-r--r--go/topology/volume_growth.go16
-rw-r--r--go/topology/volume_growth_test.go4
-rw-r--r--go/topology/volume_layout.go17
-rw-r--r--go/util/bytes.go12
-rw-r--r--go/util/config.go2
-rw-r--r--go/util/constants.go2
-rw-r--r--go/util/file_util.go2
-rw-r--r--go/util/net_timeout.go2
-rw-r--r--go/weed/benchmark.go10
-rw-r--r--go/weed/compact.go6
-rw-r--r--go/weed/download.go4
-rw-r--r--go/weed/export.go8
-rw-r--r--go/weed/filer.go8
-rw-r--r--go/weed/fix.go4
-rw-r--r--go/weed/master.go8
-rw-r--r--go/weed/mount_std.go10
-rw-r--r--go/weed/server.go17
-rw-r--r--go/weed/shell.go2
-rw-r--r--go/weed/upload.go8
-rw-r--r--go/weed/version.go6
-rw-r--r--go/weed/volume.go11
-rw-r--r--go/weed/volume_test.go2
-rw-r--r--go/weed/weed.go4
-rw-r--r--go/weed/weed_server/common.go14
-rw-r--r--go/weed/weed_server/filer_server.go4
-rw-r--r--go/weed/weed_server/filer_server_handlers.go19
-rw-r--r--go/weed/weed_server/filer_server_handlers_admin.go2
-rw-r--r--go/weed/weed_server/master_server.go8
-rw-r--r--go/weed/weed_server/master_server_handlers.go6
-rw-r--r--go/weed/weed_server/master_server_handlers_admin.go19
-rw-r--r--go/weed/weed_server/raft_server.go4
-rw-r--r--go/weed/weed_server/raft_server_handlers.go4
-rw-r--r--go/weed/weed_server/volume_server.go5
-rw-r--r--go/weed/weed_server/volume_server_handlers.go12
-rw-r--r--go/weed/weed_server/volume_server_handlers_admin.go12
-rw-r--r--go/weed/weed_server/volume_server_handlers_vacuum.go2
79 files changed, 750 insertions, 318 deletions
diff --git a/go/filer/client_operations.go b/go/filer/client_operations.go
index 5d89fc865..0b006289f 100644
--- a/go/filer/client_operations.go
+++ b/go/filer/client_operations.go
@@ -3,7 +3,7 @@ package filer
import ()
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
diff --git a/go/filer/directory_in_map.go b/go/filer/directory_in_map.go
index 9c2ecdf80..1d88a78be 100644
--- a/go/filer/directory_in_map.go
+++ b/go/filer/directory_in_map.go
@@ -2,15 +2,18 @@ package filer
import (
"bufio"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
+ "sync"
)
+var writeLock sync.Mutex //serialize changes to dir.log
+
type DirectoryEntryInMap struct {
Name string
Parent *DirectoryEntryInMap
@@ -25,23 +28,25 @@ type DirectoryManagerInMap struct {
isLoading bool
}
-func (dm *DirectoryManagerInMap) NewDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap) {
+func (dm *DirectoryManagerInMap) NewDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap, err error) {
+ writeLock.Lock()
+ defer writeLock.Unlock()
d = &DirectoryEntryInMap{Name: name, Parent: parent, SubDirectories: make(map[string]*DirectoryEntryInMap)}
- dm.max++
- d.Id = dm.max
parts := make([]string, 0)
for p := d; p != nil && p.Name != ""; p = p.Parent {
parts = append(parts, p.Name)
}
n := len(parts)
if n <= 0 {
- return d
+ return nil, fmt.Errorf("Failed to create folder %s/%s", parent.Name, name)
}
for i := 0; i < n/2; i++ {
parts[i], parts[n-1-i] = parts[n-1-i], parts[i]
}
+ dm.max++
+ d.Id = dm.max
dm.log("add", "/"+strings.Join(parts, "/"), strconv.Itoa(int(d.Id)))
- return d
+ return d, nil
}
func (dm *DirectoryManagerInMap) log(words ...string) {
@@ -157,7 +162,11 @@ func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId
if i != len(parts)-1 {
return fmt.Errorf("%s should be created after parent %s!", dirPath, parts[i])
}
- sub = dm.NewDirectoryEntryInMap(dir, parts[i])
+ var err error
+ sub, err = dm.NewDirectoryEntryInMap(dir, parts[i])
+ if err != nil {
+ return err
+ }
if sub.Id != dirId {
return fmt.Errorf("%s should be have id %v instead of %v!", dirPath, sub.Id, dirId)
}
@@ -178,7 +187,11 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn
for i := 1; i < len(parts); i++ {
sub, ok := dir.SubDirectories[parts[i]]
if !ok {
- sub = dm.NewDirectoryEntryInMap(dir, parts[i])
+ var err error
+ sub, err = dm.NewDirectoryEntryInMap(dir, parts[i])
+ if err != nil {
+ return nil, false
+ }
dir.SubDirectories[parts[i]] = sub
created = true
}
@@ -193,6 +206,8 @@ func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, err
}
func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error {
+ writeLock.Lock()
+ defer writeLock.Unlock()
oldDir, oe := dm.findDirectory(oldDirPath)
if oe != nil {
return oe
@@ -223,6 +238,8 @@ func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []Dir
return dirNames, nil
}
func (dm *DirectoryManagerInMap) DeleteDirectory(dirPath string) error {
+ writeLock.Lock()
+ defer writeLock.Unlock()
if dirPath == "/" {
return fmt.Errorf("Can not delete %s", dirPath)
}
diff --git a/go/filer/filer_embedded.go b/go/filer/filer_embedded.go
index a3b64d37b..3d3dac941 100644
--- a/go/filer/filer_embedded.go
+++ b/go/filer/filer_embedded.go
@@ -1,7 +1,7 @@
package filer
import (
- "code.google.com/p/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/operation"
"errors"
"fmt"
"path/filepath"
diff --git a/go/filer/files_in_leveldb.go b/go/filer/files_in_leveldb.go
index dbf2e6c52..41fbc74bd 100644
--- a/go/filer/files_in_leveldb.go
+++ b/go/filer/files_in_leveldb.go
@@ -2,7 +2,7 @@ package filer
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
diff --git a/go/images/orientation.go b/go/images/orientation.go
index a7c126fc1..41ed3f0af 100644
--- a/go/images/orientation.go
+++ b/go/images/orientation.go
@@ -21,7 +21,10 @@ func FixJpgOrientation(data []byte) (oriented []byte) {
}
angle := 0
flipMode := FlipDirection(0)
- orient := tag.Int(0)
+ orient, err := tag.Int(0)
+ if err != nil {
+ return data
+ }
switch orient {
case topLeftSide:
// do nothing
diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go
index 018e1d763..4e72ad939 100644
--- a/go/operation/assign_file_id.go
+++ b/go/operation/assign_file_id.go
@@ -1,8 +1,8 @@
package operation
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
@@ -17,7 +17,7 @@ type AssignResult struct {
Error string `json:"error,omitempty"`
}
-func Assign(server string, count int, replication string, collection string) (*AssignResult, error) {
+func Assign(server string, count int, replication string, collection string, ttl string) (*AssignResult, error) {
values := make(url.Values)
values.Add("count", strconv.Itoa(count))
if replication != "" {
@@ -26,6 +26,9 @@ func Assign(server string, count int, replication string, collection string) (*A
if collection != "" {
values.Add("collection", collection)
}
+ if ttl != "" {
+ values.Add("ttl", ttl)
+ }
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
glog.V(2).Info("assign result :", string(jsonBlob))
if err != nil {
diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go
index e4f9d39bb..84391b634 100644
--- a/go/operation/delete_content.go
+++ b/go/operation/delete_content.go
@@ -1,7 +1,7 @@
package operation
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go
index 491c79f20..7d46a9ebc 100644
--- a/go/operation/list_masters.go
+++ b/go/operation/list_masters.go
@@ -1,8 +1,8 @@
package operation
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
)
diff --git a/go/operation/lookup.go b/go/operation/lookup.go
index f191bfdbf..ebf153d27 100644
--- a/go/operation/lookup.go
+++ b/go/operation/lookup.go
@@ -1,7 +1,7 @@
package operation
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
_ "fmt"
diff --git a/go/operation/submit.go b/go/operation/submit.go
index 9191f7d9a..3e09c2edf 100644
--- a/go/operation/submit.go
+++ b/go/operation/submit.go
@@ -2,7 +2,7 @@ package operation
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"io"
"mime"
"os"
@@ -20,6 +20,7 @@ type FilePart struct {
ModTime int64 //in seconds
Replication string
Collection string
+ Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
}
@@ -32,12 +33,12 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) {
+func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
}
- ret, err := Assign(master, len(files), replication, collection)
+ ret, err := Assign(master, len(files), replication, collection, ttl)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@@ -112,7 +113,7 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
chunks := fi.FileSize/chunkSize + 1
fids := make([]string, 0)
for i := int64(0); i < chunks; i++ {
- id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection)
+ id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl)
if e != nil {
return 0, e
}
@@ -130,8 +131,8 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
return
}
-func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size uint32, e error) {
- ret, err := Assign(master, 1, replication, collection)
+func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) {
+ ret, err := Assign(master, 1, replication, collection, ttl)
if err != nil {
return "", 0, err
}
diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go
index 45ae8a648..9f00dd74d 100644
--- a/go/operation/system_message.pb.go
+++ b/go/operation/system_message.pb.go
@@ -15,12 +15,10 @@ It has these top-level messages:
package operation
import proto "code.google.com/p/goprotobuf/proto"
-import json "encoding/json"
import math "math"
-// Reference proto, json, and math imports to suppress error if they are not otherwise used.
+// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
-var _ = &json.SyntaxError{}
var _ = math.Inf
type VolumeInformationMessage struct {
@@ -33,6 +31,7 @@ type VolumeInformationMessage struct {
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
+ Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@@ -105,6 +104,13 @@ func (m *VolumeInformationMessage) GetVersion() uint32 {
return Default_VolumeInformationMessage_Version
}
+func (m *VolumeInformationMessage) GetTtl() uint32 {
+ if m != nil && m.Ttl != nil {
+ return *m.Ttl
+ }
+ return 0
+}
+
type JoinMessage struct {
IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go
index b89e65ce8..38737702d 100644
--- a/go/operation/upload_content.go
+++ b/go/operation/upload_content.go
@@ -2,7 +2,7 @@ package operation
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"encoding/json"
"errors"
"fmt"
diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto
index 15574ad56..ecd4973f7 100644
--- a/go/proto/system_message.proto
+++ b/go/proto/system_message.proto
@@ -10,6 +10,7 @@ message VolumeInformationMessage {
optional bool read_only = 7;
required uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
+ optional uint32 ttl = 10;
}
message JoinMessage {
diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go
index d09a87e2a..1869a563e 100644
--- a/go/storage/cdb_map.go
+++ b/go/storage/cdb_map.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
diff --git a/go/storage/cdb_map_test.go b/go/storage/cdb_map_test.go
index f6a7d42ad..cff7dfa61 100644
--- a/go/storage/cdb_map_test.go
+++ b/go/storage/cdb_map_test.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"math/rand"
"os"
"runtime"
diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go
index 37b23a59f..ef43de25b 100644
--- a/go/storage/compact_map_perf_test.go
+++ b/go/storage/compact_map_perf_test.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"log"
"os"
"testing"
diff --git a/go/storage/compress.go b/go/storage/compress.go
index 846fd0714..a353c9d3a 100644
--- a/go/storage/compress.go
+++ b/go/storage/compress.go
@@ -2,7 +2,7 @@ package storage
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"compress/flate"
"compress/gzip"
"io/ioutil"
diff --git a/go/storage/crc.go b/go/storage/crc.go
index 41f7f6d00..7aa400959 100644
--- a/go/storage/crc.go
+++ b/go/storage/crc.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"hash/crc32"
)
@@ -25,5 +25,5 @@ func (c CRC) Value() uint32 {
func (n *Needle) Etag() string {
bits := make([]byte, 4)
util.Uint32toBytes(bits, uint32(n.Checksum))
- return fmt.Sprintf("%x", bits)
+ return fmt.Sprintf("\"%x\"", bits)
}
diff --git a/go/storage/file_id.go b/go/storage/file_id.go
index 5fcd8c387..ec566826c 100644
--- a/go/storage/file_id.go
+++ b/go/storage/file_id.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/hex"
"errors"
"strings"
diff --git a/go/storage/needle.go b/go/storage/needle.go
index 77aa70169..daede321b 100644
--- a/go/storage/needle.go
+++ b/go/storage/needle.go
@@ -1,9 +1,9 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/images"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/images"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/hex"
"errors"
"io/ioutil"
@@ -38,12 +38,13 @@ type Needle struct {
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
+ Ttl *TTL
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
}
-func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) {
+func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) {
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string
fileName = fileName[:len(fileName)-3]
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ ttl, _ = ReadTTL(r.FormValue("ttl"))
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
fname, mimeType, isGzipped := "", "", false
n = new(Needle)
- fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r)
+ fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r)
if e != nil {
return
}
@@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.LastModified = uint64(time.Now().Unix())
}
n.SetHasLastModifiedDate()
+ if n.Ttl != EMPTY_TTL {
+ n.SetHasTtl()
+ }
if fixJpgOrientation {
loweredName := strings.ToLower(fname)
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index 6d94ee1ca..dca2e6c5d 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"os"
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index 835d7c270..bf452ba37 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"errors"
"fmt"
"io"
@@ -14,7 +14,9 @@ const (
FlagHasName = 0x02
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
+ FlagHasTtl = 0x10
LastModifiedBytesLength = 5
+ TtlBytesLength = 2
)
func (n *Needle) DiskSize() int64 {
@@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
}
+ if n.HasTtl() {
+ n.Size = n.Size + TtlBytesLength
+ }
}
size = n.DataSize
util.Uint32toBytes(header[12:16], n.Size)
@@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return
}
}
+ if n.HasTtl() {
+ n.Ttl.ToBytes(header[0:TtlBytesLength])
+ if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
+ return
+ }
+ }
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength
}
+ if index < lenBytes && n.HasTtl() {
+ n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
+ index = index + TtlBytesLength
+ }
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool {
func (n *Needle) SetHasLastModifiedDate() {
n.Flags = n.Flags | FlagHasLastModifiedDate
}
+func (n *Needle) HasTtl() bool {
+ return n.Flags&FlagHasTtl > 0
+}
+func (n *Needle) SetHasTtl() {
+ n.Flags = n.Flags | FlagHasTtl
+}
diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go
index 696888cd8..c1aca52eb 100644
--- a/go/storage/replica_placement.go
+++ b/go/storage/replica_placement.go
@@ -5,10 +5,6 @@ import (
"fmt"
)
-const (
- ReplicaPlacementCount = 9
-)
-
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
@@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
-
-func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
- return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
-}
diff --git a/go/storage/store.go b/go/storage/store.go
index a6a4f399e..e7a9dac94 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -2,9 +2,9 @@ package storage
import (
proto "code.google.com/p/goprotobuf/proto"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
@@ -14,6 +14,10 @@ import (
"strings"
)
+const (
+ MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
+)
+
type DiskLocation struct {
Directory string
MaxVolumeCount int
@@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
+ ttl, e := ReadTTL(ttlString)
+ if e != nil {
+ return e
+ }
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
@@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, rt)
+ e = s.addVolume(VolumeId(id), collection, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
e = err
}
}
@@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) {
}
return
}
+func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
+ e = v.Destroy()
+ if e != nil {
+ return
+ }
+ delete(volumes, v.Id)
+ return
+}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
@@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
+func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
- if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil {
+ glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
+ location.Directory, vid, collection, replicaPlacement, ttl)
+ if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
l.volumes[vid] = v
- glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
}
}
@@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) {
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
- volumeMessage := &operation.VolumeInformationMessage{
- Id: proto.Uint32(uint32(k)),
- Size: proto.Uint64(uint64(v.Size())),
- Collection: proto.String(v.Collection),
- FileCount: proto.Uint64(uint64(v.nm.FileCount())),
- DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
- DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
- ReadOnly: proto.Bool(v.readOnly),
- ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
- Version: proto.Uint32(uint32(v.Version())),
- }
- volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
+ if !v.expired(s.volumeSizeLimit) {
+ volumeMessage := &operation.VolumeInformationMessage{
+ Id: proto.Uint32(uint32(k)),
+ Size: proto.Uint64(uint64(v.Size())),
+ Collection: proto.String(v.Collection),
+ FileCount: proto.Uint64(uint64(v.nm.FileCount())),
+ DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
+ DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
+ ReadOnly: proto.Bool(v.readOnly),
+ ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
+ Version: proto.Uint32(uint32(v.Version())),
+ Ttl: proto.Uint32(v.Ttl.ToUint32()),
+ }
+ volumeMessages = append(volumeMessages, volumeMessage)
+ } else {
+ if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
+ s.DeleteVolume(location.volumes, v)
+ glog.V(0).Infoln("volume", v.Id, "is deleted.")
+ } else {
+ glog.V(0).Infoln("volume", v.Id, "is expired.")
+ }
+ }
}
}
diff --git a/go/storage/store_vacuum.go b/go/storage/store_vacuum.go
index 5adaa7561..3527e4f59 100644
--- a/go/storage/store_vacuum.go
+++ b/go/storage/store_vacuum.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"fmt"
"strconv"
)
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 7bd8e7467..de79e9107 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -2,7 +2,7 @@ package storage
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"errors"
"fmt"
"io"
@@ -12,22 +12,6 @@ import (
"time"
)
-const (
- SuperBlockSize = 8
-)
-
-type SuperBlock struct {
- Version Version
- ReplicaPlacement *ReplicaPlacement
-}
-
-func (s *SuperBlock) Bytes() []byte {
- header := make([]byte, SuperBlockSize)
- header[0] = byte(s.Version)
- header[1] = s.ReplicaPlacement.Byte()
- return header
-}
-
type Volume struct {
Id VolumeId
dir string
@@ -38,12 +22,13 @@ type Volume struct {
SuperBlock
- accessLock sync.Mutex
+ accessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@@ -65,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
- if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@@ -122,7 +108,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return e
}
func (v *Volume) Version() Version {
- return v.SuperBlock.Version
+ return v.SuperBlock.Version()
}
func (v *Volume) Size() int64 {
stat, e := v.dataFile.Stat()
@@ -138,44 +124,6 @@ func (v *Volume) Close() {
v.nm.Close()
_ = v.dataFile.Close()
}
-func (v *Volume) maybeWriteSuperBlock() error {
- stat, e := v.dataFile.Stat()
- if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
- return e
- }
- if stat.Size() == 0 {
- v.SuperBlock.Version = CurrentVersion
- _, e = v.dataFile.Write(v.SuperBlock.Bytes())
- if e != nil && os.IsPermission(e) {
- //read-only, but zero length - recreate it!
- if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
- if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
- v.readOnly = false
- }
- }
- }
- }
- return e
-}
-func (v *Volume) readSuperBlock() (err error) {
- if _, err = v.dataFile.Seek(0, 0); err != nil {
- return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
- }
- header := make([]byte, SuperBlockSize)
- if _, e := v.dataFile.Read(header); e != nil {
- return fmt.Errorf("cannot read superblock: %s", e.Error())
- }
- v.SuperBlock, err = ParseSuperBlock(header)
- return err
-}
-func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
- superBlock.Version = Version(header[0])
- if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
- err = fmt.Errorf("cannot read replica type: %s", err.Error())
- }
- return
-}
func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
@@ -246,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
return
}
@@ -275,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return bytesRead, err
+ }
+ if !n.HasTtl() {
+ return bytesRead, err
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@@ -397,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index 165af1a19..6410c1784 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -1,13 +1,14 @@
package storage
import (
- "code.google.com/p/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/operation"
)
type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
Collection string
Version Version
FileCount int
@@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
return vi, e
}
vi.ReplicaPlacement = rp
+ vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}
diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go
new file mode 100644
index 000000000..a7e86b1c3
--- /dev/null
+++ b/go/storage/volume_super_block.go
@@ -0,0 +1,75 @@
+package storage
+
+import (
+ "github.com/chrislusf/weed-fs/go/glog"
+ "fmt"
+ "os"
+)
+
+const (
+ SuperBlockSize = 8
+)
+
+/*
+* Super block currently has 8 bytes allocated for each volume.
+* Byte 0: version, 1 or 2
+* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Rest bytes: Reserved
+ */
+type SuperBlock struct {
+ version Version
+ ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
+}
+
+func (s *SuperBlock) Version() Version {
+ return s.version
+}
+func (s *SuperBlock) Bytes() []byte {
+ header := make([]byte, SuperBlockSize)
+ header[0] = byte(s.version)
+ header[1] = s.ReplicaPlacement.Byte()
+ s.Ttl.ToBytes(header[2:4])
+ return header
+}
+
+func (v *Volume) maybeWriteSuperBlock() error {
+ stat, e := v.dataFile.Stat()
+ if e != nil {
+ glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
+ return e
+ }
+ if stat.Size() == 0 {
+ v.SuperBlock.version = CurrentVersion
+ _, e = v.dataFile.Write(v.SuperBlock.Bytes())
+ if e != nil && os.IsPermission(e) {
+ //read-only, but zero length - recreate it!
+ if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
+ if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
+ v.readOnly = false
+ }
+ }
+ }
+ }
+ return e
+}
+func (v *Volume) readSuperBlock() (err error) {
+ if _, err = v.dataFile.Seek(0, 0); err != nil {
+ return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
+ }
+ header := make([]byte, SuperBlockSize)
+ if _, e := v.dataFile.Read(header); e != nil {
+ return fmt.Errorf("cannot read superblock: %s", e.Error())
+ }
+ v.SuperBlock, err = ParseSuperBlock(header)
+ return err
+}
+func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
+ superBlock.version = Version(header[0])
+ if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
+ err = fmt.Errorf("cannot read replica type: %s", err.Error())
+ }
+ superBlock.Ttl = LoadTTLFromBytes(header[2:4])
+ return
+}
diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go
new file mode 100644
index 000000000..13db4b194
--- /dev/null
+++ b/go/storage/volume_super_block_test.go
@@ -0,0 +1,23 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestSuperBlockReadWrite(t *testing.T) {
+ rp, _ := NewReplicaPlacementFromByte(byte(001))
+ ttl, _ := ReadTTL("15d")
+ s := &SuperBlock{
+ version: CurrentVersion,
+ ReplicaPlacement: rp,
+ Ttl: ttl,
+ }
+
+ bytes := s.Bytes()
+
+ if !(bytes[2] == 15 && bytes[3] == Day) {
+ println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
+ t.Fail()
+ }
+
+}
diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go
new file mode 100644
index 000000000..459ee55ba
--- /dev/null
+++ b/go/storage/volume_ttl.go
@@ -0,0 +1,135 @@
+package storage
+
+import (
+ "strconv"
+)
+
+const (
+ //stored unit types
+ Empty byte = iota
+ Minute
+ Hour
+ Day
+ Week
+ Month
+ Year
+)
+
+type TTL struct {
+ count byte
+ unit byte
+}
+
+var EMPTY_TTL = &TTL{}
+
+// translate a readable ttl to internal ttl
+// Supports format example:
+// 3m: 3 minutes
+// 4h: 4 hours
+// 5d: 5 days
+// 6w: 6 weeks
+// 7M: 7 months
+// 8y: 8 years
+func ReadTTL(ttlString string) (*TTL, error) {
+ if ttlString == "" {
+ return EMPTY_TTL, nil
+ }
+ ttlBytes := []byte(ttlString)
+ unitByte := ttlBytes[len(ttlBytes)-1]
+ countBytes := ttlBytes[0 : len(ttlBytes)-1]
+ if '0' <= unitByte && unitByte <= '9' {
+ countBytes = ttlBytes
+ unitByte = 'm'
+ }
+ count, err := strconv.Atoi(string(countBytes))
+ unit := toStoredByte(unitByte)
+ return &TTL{count: byte(count), unit: unit}, err
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromBytes(input []byte) (t *TTL) {
+ return &TTL{count: input[0], unit: input[1]}
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromUint32(ttl uint32) (t *TTL) {
+ input := make([]byte, 2)
+ input[1] = byte(ttl)
+ input[0] = byte(ttl >> 8)
+ return LoadTTLFromBytes(input)
+}
+
+// save stored bytes to an output with 2 bytes
+func (t TTL) ToBytes(output []byte) {
+ output[0] = t.count
+ output[1] = t.unit
+}
+
+func (t TTL) ToUint32() (output uint32) {
+ output = uint32(t.count) << 8
+ output += uint32(t.unit)
+ return output
+}
+
+func (t TTL) String() string {
+ if t.count == 0 {
+ return ""
+ }
+ if t.unit == Empty {
+ return ""
+ }
+ countString := strconv.Itoa(int(t.count))
+ switch t.unit {
+ case Minute:
+ return countString + "m"
+ case Hour:
+ return countString + "h"
+ case Day:
+ return countString + "d"
+ case Week:
+ return countString + "w"
+ case Month:
+ return countString + "M"
+ case Year:
+ return countString + "y"
+ }
+ return ""
+}
+
+func toStoredByte(readableUnitByte byte) byte {
+ switch readableUnitByte {
+ case 'm':
+ return Minute
+ case 'h':
+ return Hour
+ case 'd':
+ return Day
+ case 'w':
+ return Week
+ case 'M':
+ return Month
+ case 'y':
+ return Year
+ }
+ return 0
+}
+
+func (t TTL) Minutes() uint32 {
+ switch t.unit {
+ case Empty:
+ return 0
+ case Minute:
+ return uint32(t.count)
+ case Hour:
+ return uint32(t.count) * 60
+ case Day:
+ return uint32(t.count) * 60 * 24
+ case Week:
+ return uint32(t.count) * 60 * 24 * 7
+ case Month:
+ return uint32(t.count) * 60 * 24 * 31
+ case Year:
+ return uint32(t.count) * 60 * 24 * 365
+ }
+ return 0
+}
diff --git a/go/storage/volume_ttl_test.go b/go/storage/volume_ttl_test.go
new file mode 100644
index 000000000..216469a4c
--- /dev/null
+++ b/go/storage/volume_ttl_test.go
@@ -0,0 +1,60 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestTTLReadWrite(t *testing.T) {
+ ttl, _ := ReadTTL("")
+ if ttl.Minutes() != 0 {
+ t.Errorf("empty ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("9")
+ if ttl.Minutes() != 9 {
+ t.Errorf("9 ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("8m")
+ if ttl.Minutes() != 8 {
+ t.Errorf("8m ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5h")
+ if ttl.Minutes() != 300 {
+ t.Errorf("5h ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5d")
+ if ttl.Minutes() != 5*24*60 {
+ t.Errorf("5d ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5w")
+ if ttl.Minutes() != 5*7*24*60 {
+ t.Errorf("5w ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5M")
+ if ttl.Minutes() != 5*31*24*60 {
+ t.Errorf("5M ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5y")
+ if ttl.Minutes() != 5*365*24*60 {
+ t.Errorf("5y ttl:%v", ttl)
+ }
+
+ output := make([]byte, 2)
+ ttl.ToBytes(output)
+ ttl2 := LoadTTLFromBytes(output)
+ if ttl.Minutes() != ttl2.Minutes() {
+ t.Errorf("ttl:%v ttl2:%v", ttl, ttl2)
+ }
+
+ ttl3 := LoadTTLFromUint32(ttl.ToUint32())
+ if ttl.Minutes() != ttl3.Minutes() {
+ t.Errorf("ttl:%v ttl3:%v", ttl, ttl3)
+ }
+
+}
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 7d2a38cb8..b348434d2 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -1,10 +1,10 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"fmt"
"os"
- _ "time"
+ "time"
)
func (v *Volume) garbageLevel() float64 {
@@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 {
func (v *Volume) Compact() error {
glog.V(3).Infof("Compacting ...")
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- glog.V(3).Infof("Got Compaction lock...")
+ //no need to lock for copy on write
+ //v.accessLock.Lock()
+ //defer v.accessLock.Unlock()
+ //glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
@@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize)
+ now := uint64(time.Now().Unix())
+
err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes())
return err
}, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
+ }
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
diff --git a/go/tools/read_index.go b/go/tools/read_index.go
index a958de410..b99c5b6b8 100644
--- a/go/tools/read_index.go
+++ b/go/tools/read_index.go
@@ -1,7 +1,7 @@
package main
import (
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/storage"
"flag"
"fmt"
"log"
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
index 77b4ac508..6562e9ac5 100644
--- a/go/topology/allocate_volume.go
+++ b/go/topology/allocate_volume.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
@@ -12,11 +12,12 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
+func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
values := make(url.Values)
values.Add("volume", vid.String())
- values.Add("collection", collection)
- values.Add("replication", rp.String())
+ values.Add("collection", option.Collection)
+ values.Add("replication", option.ReplicaPlacement.String())
+ values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
if err != nil {
return err
diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go
index 703435173..cafc52c76 100644
--- a/go/topology/cluster_commands.go
+++ b/go/topology/cluster_commands.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"github.com/goraft/raft"
)
diff --git a/go/topology/collection.go b/go/topology/collection.go
index b21122d22..506f43fbf 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -1,33 +1,34 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/storage"
)
type Collection struct {
Name string
volumeSizeLimit uint64
- replicaType2VolumeLayout []*VolumeLayout
+ storageType2VolumeLayout map[string]*VolumeLayout
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
- c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount)
+ c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
return c
}
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout {
- replicaPlacementIndex := rp.GetReplicationLevelIndex()
- if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil {
- glog.V(0).Infoln("collection", c.Name, "adding replication type", rp)
- c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit)
+func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+ keyString := rp.String()
+ if ttl != nil {
+ keyString += ttl.String()
}
- return c.replicaType2VolumeLayout[replicaPlacementIndex]
+ if c.storageType2VolumeLayout[keyString] == nil {
+ c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ }
+ return c.storageType2VolumeLayout[keyString]
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.Lookup(vid); list != nil {
return list
@@ -38,7 +39,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index ae80e08bb..c3b90470f 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"strconv"
)
@@ -38,15 +38,16 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
}
}
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
- for vid, _ := range dn.volumes {
+ for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
+ deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
}
@@ -54,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
}
+ return
}
func (dn *DataNode) GetDataCenter() *DataCenter {
diff --git a/go/topology/node.go b/go/topology/node.go
index c52414008..54118802e 100644
--- a/go/topology/node.go
+++ b/go/topology/node.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"errors"
"math/rand"
"strings"
diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go
index a982cebe5..6ea019bd8 100644
--- a/go/topology/store_replicate.go
+++ b/go/topology/store_replicate.go
@@ -2,10 +2,10 @@ package topology
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
"net/http"
"strconv"
)
diff --git a/go/topology/topology.go b/go/topology/topology.go
index f1daffb53..c90e8de0b 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -1,10 +1,10 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/sequence"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/sequence"
+ "github.com/chrislusf/weed-fs/go/storage"
"errors"
"github.com/goraft/raft"
"io/ioutil"
@@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
- vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
}
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
@@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
+}
+func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ glog.Infof("removing volume info:%+v", v)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
-func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
+func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
@@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
- dn.UpdateVolumes(volumeInfos)
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 7398ff9bf..eb4491484 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"math/rand"
"time"
)
@@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement)
+ vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@@ -55,7 +55,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index f66d4c251..d6400c988 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -14,7 +14,7 @@ func (t *Topology) ToMap() interface{} {
m["DataCenters"] = dcs
var layouts []interface{}
for _, c := range t.collectionMap {
- for _, layout := range c.replicaType2VolumeLayout {
+ for _, layout := range c.storageType2VolumeLayout {
if layout != nil {
tmp := layout.ToMap()
tmp["collection"] = c.Name
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
index a1d6d2564..72846f20b 100644
--- a/go/topology/topology_vacuum.go
+++ b/go/topology/topology_vacuum.go
@@ -1,9 +1,9 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
@@ -80,7 +80,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
func (t *Topology) Vacuum(garbageThreshold string) int {
for _, c := range t.collectionMap {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
for vid, locationlist := range vl.vid2location {
if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
index 4965e3ba0..2859d3992 100644
--- a/go/topology/volume_growth.go
+++ b/go/topology/volume_growth.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"fmt"
"math/rand"
"sync"
@@ -19,6 +19,7 @@ This package is created to resolve these replica placement issues:
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
+ Ttl *storage.TTL
DataCenter string
Rack string
DataNode string
@@ -184,8 +185,15 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion}
+ if err := AllocateVolume(server, vid, option); err == nil {
+ vi := storage.VolumeInfo{
+ Id: vid,
+ Size: 0,
+ Collection: option.Collection,
+ ReplicaPlacement: option.ReplicaPlacement,
+ Ttl: option.Ttl,
+ Version: storage.CurrentVersion,
+ }
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)
diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go
index d2913ef5c..5581c87ce 100644
--- a/go/topology/volume_growth_test.go
+++ b/go/topology/volume_growth_test.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/sequence"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/sequence"
+ "github.com/chrislusf/weed-fs/go/storage"
"encoding/json"
"fmt"
"testing"
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 538acb54c..7bb0cf7e3 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -1,8 +1,8 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"errors"
"math/rand"
"sync"
@@ -11,15 +11,17 @@ import (
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
+ ttl *storage.TTL
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.Mutex
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
+ ttl: ttl,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit,
@@ -42,6 +44,14 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
+func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ vl.removeFromWritable(v.Id)
+ delete(vl.vid2location, v.Id)
+}
+
func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
@@ -192,6 +202,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
func (vl *VolumeLayout) ToMap() map[string]interface{} {
m := make(map[string]interface{})
m["replication"] = vl.rp.String()
+ m["ttl"] = vl.ttl.String()
m["writables"] = vl.writables
//m["locations"] = vl.vid2location
return m
diff --git a/go/util/bytes.go b/go/util/bytes.go
index 6cc3d7018..dfa4ae665 100644
--- a/go/util/bytes.go
+++ b/go/util/bytes.go
@@ -1,5 +1,7 @@
package util
+// big endian
+
func BytesToUint64(b []byte) (v uint64) {
length := uint(len(b))
for i := uint(0); i < length-1; i++ {
@@ -18,6 +20,12 @@ func BytesToUint32(b []byte) (v uint32) {
v += uint32(b[length-1])
return
}
+func BytesToUint16(b []byte) (v uint16) {
+ v += uint16(b[0])
+ v <<= 8
+ v += uint16(b[1])
+ return
+}
func Uint64toBytes(b []byte, v uint64) {
for i := uint(0); i < 8; i++ {
b[7-i] = byte(v >> (i * 8))
@@ -28,6 +36,10 @@ func Uint32toBytes(b []byte, v uint32) {
b[3-i] = byte(v >> (i * 8))
}
}
+func Uint16toBytes(b []byte, v uint16) {
+ b[0] = byte(v >> 8)
+ b[1] = byte(v)
+}
func Uint8toBytes(b []byte, v uint8) {
b[0] = byte(v)
}
diff --git a/go/util/config.go b/go/util/config.go
index 9a1ac680b..050fd0e64 100644
--- a/go/util/config.go
+++ b/go/util/config.go
@@ -10,7 +10,7 @@ package util
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"encoding/json"
"os"
)
diff --git a/go/util/constants.go b/go/util/constants.go
index 196b4a405..db1ca38e5 100644
--- a/go/util/constants.go
+++ b/go/util/constants.go
@@ -3,5 +3,5 @@ package util
import ()
const (
- VERSION = "0.63 beta"
+ VERSION = "0.64"
)
diff --git a/go/util/file_util.go b/go/util/file_util.go
index 9f3354011..412d98458 100644
--- a/go/util/file_util.go
+++ b/go/util/file_util.go
@@ -2,7 +2,7 @@ package util
import (
"bufio"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"errors"
"os"
)
diff --git a/go/util/net_timeout.go b/go/util/net_timeout.go
index a6cc81c99..eb80822b5 100644
--- a/go/util/net_timeout.go
+++ b/go/util/net_timeout.go
@@ -1,7 +1,7 @@
package util
import (
- "code.google.com/p/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/stats"
"net"
"time"
)
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go
index eab923751..fec8472e5 100644
--- a/go/weed/benchmark.go
+++ b/go/weed/benchmark.go
@@ -2,9 +2,9 @@ package main
import (
"bufio"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"math"
@@ -98,7 +98,7 @@ func init() {
}
func runbenchmark(cmd *Command, args []string) bool {
- fmt.Printf("This is Weed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.cpuprofile != "" {
f, err := os.Create(*b.cpuprofile)
if err != nil {
@@ -201,7 +201,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
- if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
+ if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, ok := serverLimitChan[fp.Server]; !ok {
serverLimitChan[fp.Server] = make(chan bool, 7)
diff --git a/go/weed/compact.go b/go/weed/compact.go
index 580f3f98d..a99e6c93e 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -1,8 +1,8 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
)
func init() {
@@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil)
+ v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/go/weed/download.go b/go/weed/download.go
index 4309fe5e0..c30d17915 100644
--- a/go/weed/download.go
+++ b/go/weed/download.go
@@ -1,8 +1,8 @@
package main
import (
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"io/ioutil"
diff --git a/go/weed/export.go b/go/weed/export.go
index 3f8ff85bb..81bc21f6e 100644
--- a/go/weed/export.go
+++ b/go/weed/export.go
@@ -3,8 +3,8 @@ package main
import (
"archive/tar"
"bytes"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"fmt"
"os"
"path"
@@ -27,7 +27,7 @@ var cmdExport = &Command{
UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}",
Short: "list or export files from one volume data file",
Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified.
-
+
The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{.Key}}.
`,
@@ -100,7 +100,7 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version
err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error {
- version = superBlock.Version
+ version = superBlock.Version()
return nil
}, true, func(n *storage.Needle, offset int64) error {
nv, ok := nm.Get(n.Id)
diff --git a/go/weed/filer.go b/go/weed/filer.go
index d7d028d50..7dbecb4d0 100644
--- a/go/weed/filer.go
+++ b/go/weed/filer.go
@@ -1,9 +1,9 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
- "code.google.com/p/weed-fs/go/weed/weed_server"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http"
"os"
"strconv"
@@ -63,7 +63,7 @@ func runFiler(cmd *Command, args []string) bool {
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
}
- glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", strconv.Itoa(*f.port))
+ glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*f.port))
filerListener, e := util.NewListener(
":"+strconv.Itoa(*f.port),
time.Duration(10)*time.Second,
diff --git a/go/weed/fix.go b/go/weed/fix.go
index 02147d796..ad573875a 100644
--- a/go/weed/fix.go
+++ b/go/weed/fix.go
@@ -1,8 +1,8 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"os"
"path"
"strconv"
diff --git a/go/weed/master.go b/go/weed/master.go
index b95ca5cb1..6617c8ca6 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -1,9 +1,9 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
- "code.google.com/p/weed-fs/go/weed/weed_server"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
"net/http"
"os"
@@ -63,7 +63,7 @@ func runMaster(cmd *Command, args []string) bool {
listeningAddress := *masterIp + ":" + strconv.Itoa(*mport)
- glog.V(0).Infoln("Start Weed Master", util.VERSION, "at", listeningAddress)
+ glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second)
if e != nil {
diff --git a/go/weed/mount_std.go b/go/weed/mount_std.go
index 9376b3f2e..e5fc0986c 100644
--- a/go/weed/mount_std.go
+++ b/go/weed/mount_std.go
@@ -5,17 +5,17 @@ package main
import (
"bazil.org/fuse"
"bazil.org/fuse/fs"
- "code.google.com/p/weed-fs/go/filer"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/filer"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"os"
"runtime"
)
func runMount(cmd *Command, args []string) bool {
- fmt.Printf("This is Weed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *mountOptions.dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
diff --git a/go/weed/server.go b/go/weed/server.go
index 8b0372159..1d854d641 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -1,9 +1,9 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
- "code.google.com/p/weed-fs/go/weed/weed_server"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
"net/http"
"os"
@@ -48,6 +48,7 @@ var cmdServer = &Command{
var (
serverIp = cmdServer.Flag.String("ip", "", "ip or server name")
serverPublicIp = cmdServer.Flag.String("publicIp", "", "ip or server name")
+ serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
@@ -152,7 +153,7 @@ func runServer(cmd *Command, args []string) bool {
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
}
- glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port))
+ glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port))
filerListener, e := util.NewListener(
":"+strconv.Itoa(*filerOptions.port),
time.Duration(10)*time.Second,
@@ -178,8 +179,8 @@ func runServer(cmd *Command, args []string) bool {
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList,
)
- glog.V(0).Infoln("Start Weed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
- masterListener, e := util.NewListener(*serverIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second)
+ glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
+ masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second)
if e != nil {
glog.Fatalf(e.Error())
}
@@ -211,9 +212,9 @@ func runServer(cmd *Command, args []string) bool {
*volumeFixJpgOrientation,
)
- glog.V(0).Infoln("Start Weed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
+ glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
volumeListener, e := util.NewListener(
- *serverIp+":"+strconv.Itoa(*volumePort),
+ *serverBindIp+":"+strconv.Itoa(*volumePort),
time.Duration(*serverTimeout)*time.Second,
)
if e != nil {
diff --git a/go/weed/shell.go b/go/weed/shell.go
index 6a3331284..c8043e0dd 100644
--- a/go/weed/shell.go
+++ b/go/weed/shell.go
@@ -2,7 +2,7 @@ package main
import (
"bufio"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"fmt"
"os"
)
diff --git a/go/weed/upload.go b/go/weed/upload.go
index b59313a2a..4eae4d274 100644
--- a/go/weed/upload.go
+++ b/go/weed/upload.go
@@ -1,7 +1,7 @@
package main
import (
- "code.google.com/p/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/operation"
"encoding/json"
"fmt"
"os"
@@ -12,6 +12,7 @@ var (
uploadReplication *string
uploadCollection *string
uploadDir *string
+ uploadTtl *string
include *string
maxMB *int
)
@@ -24,6 +25,7 @@ func init() {
include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
uploadReplication = cmdUpload.Flag.String("replication", "", "replication type")
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
+ uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
}
@@ -67,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
+ results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -84,7 +86,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
- results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
+ results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}
diff --git a/go/weed/version.go b/go/weed/version.go
index 2e0a59822..63441509e 100644
--- a/go/weed/version.go
+++ b/go/weed/version.go
@@ -1,7 +1,7 @@
package main
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"runtime"
)
@@ -9,8 +9,8 @@ import (
var cmdVersion = &Command{
Run: runVersion,
UsageLine: "version",
- Short: "print Weed File System version",
- Long: `Version prints the Weed File System version`,
+ Short: "print Seaweed File System version",
+ Long: `Version prints the Seaweed File System version`,
}
func runVersion(cmd *Command, args []string) bool {
diff --git a/go/weed/volume.go b/go/weed/volume.go
index 8aa52c43d..17d03f0c5 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -1,9 +1,9 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
- "code.google.com/p/weed-fs/go/weed/weed_server"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http"
"os"
"runtime"
@@ -30,6 +30,7 @@ var (
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
ip = cmdVolume.Flag.String("ip", "", "ip or server name")
publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
+ bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
@@ -84,9 +85,9 @@ func runVolume(cmd *Command, args []string) bool {
*fixJpgOrientation,
)
- listeningAddress := *ip + ":" + strconv.Itoa(*vport)
+ listeningAddress := *bindIp + ":" + strconv.Itoa(*vport)
- glog.V(0).Infoln("Start Weed volume server", util.VERSION, "at", listeningAddress)
+ glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*vTimeout)*time.Second)
if e != nil {
diff --git a/go/weed/volume_test.go b/go/weed/volume_test.go
index 2499d8543..764362a2b 100644
--- a/go/weed/volume_test.go
+++ b/go/weed/volume_test.go
@@ -1,7 +1,7 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"net/http"
"testing"
"time"
diff --git a/go/weed/weed.go b/go/weed/weed.go
index c739d8e93..c1f5a72de 100644
--- a/go/weed/weed.go
+++ b/go/weed/weed.go
@@ -1,7 +1,7 @@
package main
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"flag"
"fmt"
"io"
@@ -90,7 +90,7 @@ func main() {
}
var usageTemplate = `
-Weed File System : store billions of files and serve them fast!
+Seaweed File System : store billions of files and serve them fast!
Usage:
diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go
index a547d7462..816107dc5 100644
--- a/go/weed/weed_server/common.go
+++ b/go/weed/weed_server/common.go
@@ -2,11 +2,11 @@ package weed_server
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/stats"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"fmt"
"net"
@@ -99,14 +99,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r)
+ fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, pe)
return
}
debug("assigning file id for", fname)
- assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"))
+ assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl"))
if ae != nil {
writeJsonError(w, r, ae)
return
diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go
index e56cb5964..5ff0ed986 100644
--- a/go/weed/weed_server/filer_server.go
+++ b/go/weed/weed_server/filer_server.go
@@ -1,8 +1,8 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/filer"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/filer"
+ "github.com/chrislusf/weed-fs/go/glog"
"net/http"
"strconv"
)
diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go
index ab30aaaed..e36e7c310 100644
--- a/go/weed/weed_server/filer_server_handlers.go
+++ b/go/weed/weed_server/filer_server_handlers.go
@@ -1,9 +1,9 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"github.com/syndtr/goleveldb/leveldb"
@@ -103,12 +103,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
for k, v := range resp.Header {
w.Header()[k] = v
}
+ w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
- assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection)
+ assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl"))
if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, ae)
@@ -130,14 +131,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
resp, do_err := util.Do(request)
if do_err != nil {
- glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
+ glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
writeJsonError(w, r, do_err)
return
}
defer resp.Body.Close()
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
- glog.V(0).Infoln("failing to upload to volume server", ra_err.Error())
+ glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
writeJsonError(w, r, ra_err)
return
}
@@ -145,12 +146,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
var ret operation.UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload resonse", string(resp_body))
+ glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
writeJsonError(w, r, unmarshal_err)
return
}
if ret.Error != "" {
- glog.V(0).Infoln("failing to post to volume server", ret.Error)
+ glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
writeJsonError(w, r, errors.New(ret.Error))
return
}
@@ -168,7 +169,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
- glog.V(0).Infoln("failing to write to filer server", db_err.Error())
+ glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error())
writeJsonError(w, r, db_err)
return
}
diff --git a/go/weed/weed_server/filer_server_handlers_admin.go b/go/weed/weed_server/filer_server_handlers_admin.go
index b4b8b7221..ff52dff24 100644
--- a/go/weed/weed_server/filer_server_handlers_admin.go
+++ b/go/weed/weed_server/filer_server_handlers_admin.go
@@ -1,7 +1,7 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"net/http"
)
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index a89959fa4..401f6cfdb 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -1,10 +1,10 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/sequence"
- "code.google.com/p/weed-fs/go/topology"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/sequence"
+ "github.com/chrislusf/weed-fs/go/topology"
+ "github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft"
"github.com/gorilla/mux"
"net/http"
diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go
index 1b58e6e73..93e9e7d9a 100644
--- a/go/weed/weed_server/master_server_handlers.go
+++ b/go/weed/weed_server/master_server_handlers.go
@@ -1,9 +1,9 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/stats"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/storage"
"net/http"
"strconv"
"strings"
diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go
index d50075fd5..c9a8020c2 100644
--- a/go/weed/weed_server/master_server_handlers_admin.go
+++ b/go/weed/weed_server/master_server_handlers_admin.go
@@ -2,11 +2,11 @@ package weed_server
import (
proto "code.google.com/p/goprotobuf/proto"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/topology"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"io/ioutil"
@@ -55,7 +55,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
}
- ms.Topo.RegisterVolumes(joinMessage)
+ ms.Topo.ProcessJoinMessage(joinMessage)
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}
@@ -144,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
@@ -157,9 +157,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
+ ttl, err := storage.ReadTTL(r.FormValue("ttl"))
+ if err != nil {
+ return nil, err
+ }
volumeGrowOption := &topology.VolumeGrowOption{
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
+ Ttl: ttl,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
DataNode: r.FormValue("dataNode"),
diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go
index f67caaebd..e41867076 100644
--- a/go/weed/weed_server/raft_server.go
+++ b/go/weed/weed_server/raft_server.go
@@ -2,8 +2,8 @@ package weed_server
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/topology"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/topology"
"encoding/json"
"errors"
"fmt"
diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go
index 1ce24a963..4d51c0767 100644
--- a/go/weed/weed_server/raft_server_handlers.go
+++ b/go/weed/weed_server/raft_server_handlers.go
@@ -1,8 +1,8 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
"encoding/json"
"github.com/goraft/raft"
"io/ioutil"
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index b118e8c35..2a9085f3b 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -1,8 +1,8 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"math/rand"
"net/http"
"strconv"
@@ -35,7 +35,6 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, fol
}
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
- r.HandleFunc("/submit", secure(vs.whiteList, vs.submitFromVolumeServerHandler))
r.HandleFunc("/status", secure(vs.whiteList, vs.statusHandler))
r.HandleFunc("/admin/assign_volume", secure(vs.whiteList, vs.assignVolumeHandler))
r.HandleFunc("/admin/vacuum_volume_check", secure(vs.whiteList, vs.vacuumVolumeCheckHandler))
diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go
index eed198e4a..ce14f6a87 100644
--- a/go/weed/weed_server/volume_server_handlers.go
+++ b/go/weed/weed_server/volume_server_handlers.go
@@ -1,12 +1,12 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/images"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/stats"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/images"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/topology"
"io"
"mime"
"mime/multipart"
diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go
index 5921a3b96..1118c8017 100644
--- a/go/weed/weed_server/volume_server_handlers_admin.go
+++ b/go/weed/weed_server/volume_server_handlers_admin.go
@@ -1,9 +1,9 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/stats"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/util"
"net/http"
"path/filepath"
)
@@ -16,7 +16,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
- err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"))
+ err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, map[string]string{"error": ""})
} else {
@@ -50,10 +50,6 @@ func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Reque
glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err)
}
-func (vs *VolumeServer) submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) {
- submitForClientHandler(w, r, vs.masterNode)
-}
-
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
diff --git a/go/weed/weed_server/volume_server_handlers_vacuum.go b/go/weed/weed_server/volume_server_handlers_vacuum.go
index 60a5e9742..b0600d799 100644
--- a/go/weed/weed_server/volume_server_handlers_vacuum.go
+++ b/go/weed/weed_server/volume_server_handlers_vacuum.go
@@ -1,7 +1,7 @@
package weed_server
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"net/http"
)