aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-10-10 13:07:57 +0800
committerGitHub <noreply@github.com>2020-10-10 13:07:57 +0800
commit411e49f96494629fa3da334e8dc7cc15bd4d19cd (patch)
tree1756f0a2f86c871a6db67df7ecbd509c9df32233 /weed
parentac162fc85769cb1b2a1f8694f9644eae7d0ce6c8 (diff)
parent4a15e9c830de1b654515308e5be8380ffa34aefa (diff)
downloadseaweedfs-411e49f96494629fa3da334e8dc7cc15bd4d19cd.tar.xz
seaweedfs-411e49f96494629fa3da334e8dc7cc15bd4d19cd.zip
Merge pull request #23 from chrislusf/master
sync
Diffstat (limited to 'weed')
-rw-r--r--weed/command/benchmark.go9
-rw-r--r--weed/command/filer.go24
-rw-r--r--weed/command/master.go34
-rw-r--r--weed/command/mount_std.go27
-rw-r--r--weed/command/s3.go14
-rw-r--r--weed/command/scaffold.go2
-rw-r--r--weed/command/server.go7
-rw-r--r--weed/command/volume.go5
-rw-r--r--weed/filer/filechunk_manifest.go36
-rw-r--r--weed/filer/filechunks.go12
-rw-r--r--weed/filer/filer_delete_entry.go2
-rw-r--r--weed/filer/reader_at.go138
-rw-r--r--weed/filer/stream.go48
-rw-r--r--weed/filesys/dir.go9
-rw-r--r--weed/filesys/dir_link.go6
-rw-r--r--weed/filesys/wfs.go5
-rw-r--r--weed/operation/submit.go3
-rw-r--r--weed/pb/master.proto2
-rw-r--r--weed/pb/master_pb/master.pb.go238
-rw-r--r--weed/replication/repl_util/replication_utli.go40
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go25
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go30
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go22
-rw-r--r--weed/replication/sink/s3sink/s3_write.go11
-rw-r--r--weed/replication/source/filer_source.go21
-rw-r--r--weed/s3api/auth_credentials.go8
-rw-r--r--weed/s3api/filer_util_tags.go104
-rw-r--r--weed/s3api/s3api_object_tagging_handlers.go117
-rw-r--r--weed/s3api/s3api_server.go17
-rw-r--r--weed/s3api/s3err/s3api_errors.go6
-rw-r--r--weed/s3api/stats.go27
-rw-r--r--weed/s3api/tags.go38
-rw-r--r--weed/s3api/tags_test.go50
-rw-r--r--weed/server/filer_grpc_server.go55
-rw-r--r--weed/server/filer_server.go33
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go1
-rw-r--r--weed/server/master_grpc_server.go17
-rw-r--r--weed/server/master_grpc_server_volume.go13
-rw-r--r--weed/server/master_server.go5
-rw-r--r--weed/server/raft_server.go133
-rw-r--r--weed/server/raft_server_handlers.go14
-rw-r--r--weed/server/volume_grpc_client_to_master.go2
-rw-r--r--weed/server/volume_server_handlers_read.go4
-rw-r--r--weed/server/volume_server_handlers_write.go3
-rw-r--r--weed/server/webdav_server.go2
-rw-r--r--weed/stats/metrics.go4
-rw-r--r--weed/storage/backend/volume_create_linux.go2
-rw-r--r--weed/storage/disk_location.go3
-rw-r--r--weed/storage/store.go4
-rw-r--r--weed/storage/store_ec.go4
-rw-r--r--weed/storage/volume_read_write.go3
-rw-r--r--weed/util/chunk_cache/chunk_cache.go32
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go51
-rw-r--r--weed/util/chunk_cache/on_disk_cache_layer.go8
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/fullpath.go2
-rw-r--r--weed/wdclient/masterclient.go30
-rw-r--r--weed/wdclient/vid_map.go51
58 files changed, 1137 insertions, 478 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 4a9a9619a..8bb585d91 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -282,14 +282,19 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now()
var bytesRead int
var err error
- url, err := b.masterClient.LookupFileId(fid)
+ urls, err := b.masterClient.LookupFileId(fid)
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
continue
}
var bytes []byte
- bytes, err = util.Get(url)
+ for _, url := range urls {
+ bytes, err = util.Get(url)
+ if err == nil {
+ break
+ }
+ }
bytesRead = len(bytes)
if err == nil {
s.completed++
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 7ff9fcd9a..e885eafc4 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -1,6 +1,7 @@
package command
import (
+ "fmt"
"net/http"
"strconv"
"strings"
@@ -18,7 +19,9 @@ import (
)
var (
- f FilerOptions
+ f FilerOptions
+ filerStartS3 *bool
+ filerS3Options S3Options
)
type FilerOptions struct {
@@ -51,7 +54,7 @@ func init() {
f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
- f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
+ f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
@@ -60,6 +63,14 @@ func init() {
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
+
+ // start s3 on filer
+ filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
+ filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port")
+ filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
+ filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
+ filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
}
var cmdFiler = &Command{
@@ -89,6 +100,15 @@ func runFiler(cmd *Command, args []string) bool {
go stats_collect.StartMetricsServer(*f.metricsHttpPort)
+ if *filerStartS3 {
+ filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port)
+ filerS3Options.filer = &filerAddress
+ go func() {
+ time.Sleep(2 * time.Second)
+ filerS3Options.startS3Server()
+ }()
+ }
+
f.startFiler()
return true
diff --git a/weed/command/master.go b/weed/command/master.go
index 144962f63..bf5f83875 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,15 +1,16 @@
package command
import (
+ "github.com/chrislusf/raft/protobuf"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc/reflection"
"net/http"
"os"
"runtime"
+ "sort"
"strconv"
"strings"
-
- "github.com/chrislusf/raft/protobuf"
- "github.com/gorilla/mux"
- "google.golang.org/grpc/reflection"
+ "time"
"github.com/chrislusf/seaweedfs/weed/util/grace"
@@ -41,6 +42,7 @@ type MasterOptions struct {
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
+ raftResumeState *bool
}
func init() {
@@ -59,6 +61,7 @@ func init() {
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
+ m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
}
var cmdMaster = &Command{
@@ -118,10 +121,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("Master startup error: %v", e)
}
// start raftServer
- raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
- peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5)
+ raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
+ peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5, *masterOption.raftResumeState)
if raftServer == nil {
- glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
@@ -139,6 +142,15 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
go grpcS.Serve(grpcL)
+ go func() {
+ time.Sleep(1500 * time.Millisecond)
+ if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
+ if ms.MasterClient.FindLeader(myMasterAddress) == "" {
+ raftServer.DoJoinCommand()
+ }
+ }
+ }()
+
go ms.MasterClient.KeepConnectedToMaster()
// start http server
@@ -172,6 +184,14 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
return
}
+func isTheFirstOne(self string, peers []string) bool {
+ sort.Strings(peers)
+ if len(peers) <= 0 {
+ return true
+ }
+ return self == peers[0]
+}
+
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
return &weed_server.MasterOption{
Host: *m.ip,
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 7c0f56d3a..14374eb5c 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -7,6 +7,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"os"
+ "os/user"
"path"
"runtime"
"strconv"
@@ -92,6 +93,29 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
fileInfo, err := os.Stat(dir)
+ uid, gid := uint32(0), uint32(0)
+ mountMode := os.ModeDir | 0777
+ if err == nil {
+ mountMode = os.ModeDir | fileInfo.Mode()
+ uid, gid = util.GetFileUidGid(fileInfo)
+ fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode())
+ } else {
+ fmt.Printf("can not stat %s\n", dir)
+ return false
+ }
+
+ if uid == 0 {
+ if u, err := user.Current(); err == nil {
+ if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
+ uid = uint32(parsedId)
+ }
+ if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
+ gid = uint32(parsedId)
+ }
+ fmt.Printf("current uid=%d gid=%d\n", uid, gid)
+ }
+ }
+
// mapping uid, gid
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
if err != nil {
@@ -150,6 +174,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
EntryCacheTtl: 3 * time.Second,
+ MountUid: uid,
+ MountGid: gid,
+ MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
diff --git a/weed/command/s3.go b/weed/command/s3.go
index e94decaf3..7d0954c0c 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -54,7 +54,13 @@ var cmdS3 = &Command{
{
"identities": [
{
- "name": "some_name",
+ "name": "anonymous",
+ "actions": [
+ "Read"
+ ]
+ },
+ {
+ "name": "some_admin_user",
"credentials": [
{
"accessKey": "some_access_key1",
@@ -64,6 +70,8 @@ var cmdS3 = &Command{
"actions": [
"Admin",
"Read",
+ "List",
+ "Tagging",
"Write"
]
},
@@ -89,6 +97,8 @@ var cmdS3 = &Command{
],
"actions": [
"Read",
+ "List",
+ "Tagging",
"Write"
]
},
@@ -102,6 +112,8 @@ var cmdS3 = &Command{
],
"actions": [
"Read:bucket1",
+ "List:bucket1",
+ "Tagging:bucket1",
"Write:bucket1"
]
}
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index dd12f12a2..c36e4a25f 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -140,6 +140,8 @@ keyspace="seaweedfs"
hosts=[
"localhost:9042",
]
+username=""
+password=""
[redis2]
enabled = false
diff --git a/weed/command/server.go b/weed/command/server.go
index 91d8d22c6..80fb14600 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -81,11 +81,12 @@ func init() {
masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
+ masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
- filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
+ filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
@@ -165,10 +166,6 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
- if *filerOptions.defaultReplicaPlacement == "" {
- *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
- }
-
runtime.GOMAXPROCS(runtime.NumCPU())
go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
diff --git a/weed/command/volume.go b/weed/command/volume.go
index dfc649ba5..d73c24ed1 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -138,6 +138,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
glog.Fatalf("The max specified in -max not a valid number %s", maxString)
}
}
+ if len(v.folderMaxLimits) == 1 && len(v.folders) > 1 {
+ for i := 0; i < len(v.folders)-1; i++ {
+ v.folderMaxLimits = append(v.folderMaxLimits, v.folderMaxLimits[0])
+ }
+ }
if len(v.folders) != len(v.folderMaxLimits) {
glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits))
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 37b172357..9e53e008f 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math"
+ "time"
"github.com/golang/protobuf/proto"
@@ -84,21 +85,40 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil
// TODO fetch from cache for weed mount?
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
- urlString, err := lookupFileIdFn(fileId)
+ urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return nil, err
}
+ return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
+}
+
+func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
+
+ var err error
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
- buffer.Write(data)
- })
- if err != nil {
- glog.V(0).Infof("read %s failed, err: %v", fileId, err)
- return nil, err
+
+ for waitTime := time.Second; waitTime < 10*time.Second; waitTime += waitTime / 2 {
+ for _, urlString := range urlStrings {
+ err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(0).Infof("read %s failed, err: %v", urlString, err)
+ buffer.Reset()
+ } else {
+ break
+ }
+ }
+ if err != nil {
+ glog.V(0).Infof("sleep for %v before retrying reading", waitTime)
+ time.Sleep(waitTime)
+ } else {
+ break
+ }
}
- return buffer.Bytes(), nil
+ return buffer.Bytes(), err
}
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index db55eec00..c75a35f79 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -1,13 +1,15 @@
package filer
import (
+ "bytes"
+ "encoding/hex"
"fmt"
- "hash/fnv"
"math"
"sort"
"sync"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
@@ -42,12 +44,12 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
if len(chunks) == 1 {
return chunks[0].ETag
}
-
- h := fnv.New32a()
+ md5_digests := [][]byte{}
for _, c := range chunks {
- h.Write([]byte(c.ETag))
+ md5_decoded, _ := hex.DecodeString(c.ETag)
+ md5_digests = append(md5_digests, md5_decoded)
}
- return fmt.Sprintf("%x", h.Sum32())
+ return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks))
}
func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 6c9ff56d3..69219fbfa 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -77,7 +77,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
if lastFileName == "" && !isRecursive && len(entries) > 0 {
// only for first iteration in the loop
glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, nil,fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
}
for _, sub := range entries {
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 9f338782e..fa51df687 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -3,52 +3,72 @@ package filer
import (
"context"
"fmt"
- "io"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
+ "github.com/golang/groupcache/singleflight"
+ "io"
+ "math/rand"
+ "sync"
)
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
- lookupFileId func(fileId string) (targetUrl string, err error)
+ lookupFileId LookupFileIdFunctionType
readerLock sync.Mutex
fileSize int64
- chunkCache chunk_cache.ChunkCache
+ fetchGroup singleflight.Group
+ lastChunkFileId string
+ lastChunkData []byte
+ chunkCache chunk_cache.ChunkCache
}
// var _ = io.ReaderAt(&ChunkReadAt{})
-type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
+type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
- return func(fileId string) (targetUrl string, err error) {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- vid := VolumeId(fileId)
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
- VolumeIds: []string{vid},
- })
- if err != nil {
- return err
- }
- locations := resp.LocationsMap[vid]
- if locations == nil || len(locations.Locations) == 0 {
- glog.V(0).Infof("failed to locate %s", fileId)
- return fmt.Errorf("failed to locate %s", fileId)
- }
+ vidCache := make(map[string]*filer_pb.Locations)
+ return func(fileId string) (targetUrls []string, err error) {
+ vid := VolumeId(fileId)
+ locations, found := vidCache[vid]
+
+ if !found {
+ // println("looking up volume", vid)
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return err
+ }
+
+ locations = resp.LocationsMap[vid]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", fileId)
+ return fmt.Errorf("failed to locate %s", fileId)
+ }
+ vidCache[vid] = locations
+
+ return nil
+ })
+ }
- volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
+ for _, loc := range locations.Locations {
+ volumeServerAddress := filerClient.AdjustedUrl(loc.Url)
+ targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
+ targetUrls = append(targetUrls, targetUrl)
+ }
- targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
+ for i := len(targetUrls) - 1; i > 0; i-- {
+ j := rand.Intn(i + 1)
+ targetUrls[i], targetUrls[j] = targetUrls[j], targetUrls[i]
+ }
- return nil
- })
return
}
}
@@ -76,10 +96,16 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
var buffer []byte
startOffset, remaining := offset, int64(len(p))
+ var nextChunk *ChunkView
for i, chunk := range c.chunkViews {
if remaining <= 0 {
break
}
+ if i+1 < len(c.chunkViews) {
+ nextChunk = c.chunkViews[i+1]
+ } else {
+ nextChunk = nil
+ }
if startOffset < chunk.LogicOffset {
gap := int(chunk.LogicOffset - startOffset)
glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
@@ -95,7 +121,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
continue
}
glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
- buffer, err = c.readFromWholeChunkData(chunk)
+ buffer, err = c.readFromWholeChunkData(chunk, nextChunk)
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
return
@@ -123,27 +149,63 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
-func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) {
+func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) {
- glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
+ if c.lastChunkFileId == chunkView.FileId {
+ return c.lastChunkData, nil
+ }
- chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
- if chunkData != nil {
- glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData)))
- } else {
- glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId)
- chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
- if err != nil {
- return
+ v, doErr := c.readOneWholeChunk(chunkView)
+
+ if doErr != nil {
+ return nil, doErr
+ }
+
+ chunkData = v.([]byte)
+
+ c.lastChunkData = chunkData
+ c.lastChunkFileId = chunkView.FileId
+
+ for _, nextChunkView := range nextChunkViews {
+ if c.chunkCache != nil && nextChunkView != nil {
+ go c.readOneWholeChunk(nextChunkView)
}
- c.chunkCache.SetChunk(chunkView.FileId, chunkData)
}
return
}
-func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) {
+
+ var err error
+
+ return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) {
+
+ glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
+
+ data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ if data != nil {
+ glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
+ } else {
+ var err error
+ data, err = c.doFetchFullChunkData(chunkView)
+ if err != nil {
+ return data, err
+ }
+ c.chunkCache.SetChunk(chunkView.FileId, data)
+ }
+ return data, err
+ })
+}
+
+func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) {
+
+ glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
+
+ data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
+
+ glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
- return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped)
+ return data, err
}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index dc6e414ca..f6e2a7643 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -17,28 +17,28 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
- fileId2Url := make(map[string]string)
+ fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
- urlString, err := masterClient.LookupFileId(chunkView.FileId)
+ urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
- fileId2Url[chunkView.FileId] = urlString
+ fileId2Url[chunkView.FileId] = urlStrings
}
for _, chunkView := range chunkViews {
- urlString := fileId2Url[chunkView.FileId]
- err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ urlStrings := fileId2Url[chunkView.FileId]
+
+ data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ if err == nil {
return err
}
+ w.Write(data)
+
}
return nil
@@ -51,25 +51,24 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
buffer := bytes.Buffer{}
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
for _, chunkView := range chunkViews {
- urlString, err := lookupFileIdFn(chunkView.FileId)
+ urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return nil, err
}
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
+
+ data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
return nil, err
}
+ buffer.Write(data)
}
return buffer.Bytes(), nil
}
@@ -89,7 +88,7 @@ var _ = io.ReadSeeker(&ChunkStreamReader{})
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
return masterClient.LookupFileId(fileId)
}
@@ -169,17 +168,24 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
- urlString, err := c.lookupFileId(chunkView.FileId)
+ urlStrings, err := c.lookupFileId(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
+ for _, urlString := range urlStrings {
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ buffer.Reset()
+ } else {
+ break
+ }
+ }
if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
return err
}
c.buffer = buffer.Bytes()
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 7d93dbd9f..4dede3a8b 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -82,9 +82,9 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
attr.Valid = time.Hour
- attr.Uid = dir.entry.Attributes.Uid
- attr.Gid = dir.entry.Attributes.Gid
- attr.Mode = os.FileMode(dir.entry.Attributes.FileMode)
+ attr.Uid = dir.wfs.option.MountUid
+ attr.Gid = dir.wfs.option.MountGid
+ attr.Mode = dir.wfs.option.MountMode
attr.Crtime = dir.wfs.option.MountCtime
attr.Ctime = dir.wfs.option.MountCtime
attr.Mtime = dir.wfs.option.MountMtime
@@ -354,7 +354,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
glog.V(3).Infof("remove directory entry: %v", req)
- err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false, []int32{dir.wfs.signature})
+ ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
+ err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, ignoreRecursiveErr, false, []int32{dir.wfs.signature})
if err != nil {
glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
if strings.Contains(err.Error(), "non-empty") {
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 368ded442..f6bc41b56 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -18,6 +18,10 @@ var _ = fs.NodeLinker(&Dir{})
var _ = fs.NodeSymlinker(&Dir{})
var _ = fs.NodeReadlinker(&File{})
+const (
+ HARD_LINK_MARKER = '\x01'
+)
+
func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) {
oldFile, ok := old.(*File)
@@ -33,7 +37,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
// update old file to hardlink mode
if len(oldFile.entry.HardLinkId) == 0 {
- oldFile.entry.HardLinkId = util.RandomBytes(16)
+ oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
oldFile.entry.HardLinkCounter = 1
}
oldFile.entry.HardLinkCounter++
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index ef31a9258..57b4c3da5 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -37,6 +37,9 @@ type Option struct {
EntryCacheTtl time.Duration
Umask os.FileMode
+ MountUid uint32
+ MountGid uint32
+ MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
@@ -86,7 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
- wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB)
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper)
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index e8bec382a..25843c892 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -170,6 +170,9 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
}
}
fileUrl := "http://" + ret.Url + "/" + id
+ if usePublicUrl {
+ fileUrl = "http://" + ret.PublicUrl + "/" + id
+ }
count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
diff --git a/weed/pb/master.proto b/weed/pb/master.proto
index e96582df9..96c3c75cc 100644
--- a/weed/pb/master.proto
+++ b/weed/pb/master.proto
@@ -274,6 +274,8 @@ message GetMasterConfigurationResponse {
string metrics_address = 1;
uint32 metrics_interval_seconds = 2;
repeated StorageBackend storage_backends = 3;
+ string default_replication = 4;
+ string leader = 5;
}
message ListMasterClientsRequest {
diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go
index 98e501db3..d23366ade 100644
--- a/weed/pb/master_pb/master.pb.go
+++ b/weed/pb/master_pb/master.pb.go
@@ -2279,6 +2279,8 @@ type GetMasterConfigurationResponse struct {
MetricsAddress string `protobuf:"bytes,1,opt,name=metrics_address,json=metricsAddress,proto3" json:"metrics_address,omitempty"`
MetricsIntervalSeconds uint32 `protobuf:"varint,2,opt,name=metrics_interval_seconds,json=metricsIntervalSeconds,proto3" json:"metrics_interval_seconds,omitempty"`
StorageBackends []*StorageBackend `protobuf:"bytes,3,rep,name=storage_backends,json=storageBackends,proto3" json:"storage_backends,omitempty"`
+ DefaultReplication string `protobuf:"bytes,4,opt,name=default_replication,json=defaultReplication,proto3" json:"default_replication,omitempty"`
+ Leader string `protobuf:"bytes,5,opt,name=leader,proto3" json:"leader,omitempty"`
}
func (x *GetMasterConfigurationResponse) Reset() {
@@ -2334,6 +2336,20 @@ func (x *GetMasterConfigurationResponse) GetStorageBackends() []*StorageBackend
return nil
}
+func (x *GetMasterConfigurationResponse) GetDefaultReplication() string {
+ if x != nil {
+ return x.DefaultReplication
+ }
+ return ""
+}
+
+func (x *GetMasterConfigurationResponse) GetLeader() string {
+ if x != nil {
+ return x.Leader
+ }
+ return ""
+}
+
type ListMasterClientsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -3197,7 +3213,7 @@ var file_master_proto_rawDesc = []byte{
0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6c, 0x6f, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x1f, 0x0a, 0x1d, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73,
0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
- 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xc9, 0x01, 0x0a, 0x1e, 0x47, 0x65, 0x74, 0x4d,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x92, 0x02, 0x0a, 0x1e, 0x47, 0x65, 0x74, 0x4d,
0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x6d, 0x65,
0x74, 0x72, 0x69, 0x63, 0x73, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20,
@@ -3210,115 +3226,119 @@ var file_master_proto_rawDesc = []byte{
0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72,
0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x65,
0x6e, 0x64, 0x52, 0x0f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x65,
- 0x6e, 0x64, 0x73, 0x22, 0x3b, 0x0a, 0x18, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65,
- 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
- 0x1f, 0x0a, 0x0b, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65,
- 0x22, 0x42, 0x0a, 0x19, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6c,
- 0x69, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a,
- 0x0e, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18,
- 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x67, 0x72, 0x70, 0x63, 0x41, 0x64, 0x64, 0x72, 0x65,
- 0x73, 0x73, 0x65, 0x73, 0x22, 0x8a, 0x01, 0x0a, 0x16, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64,
- 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
- 0x25, 0x0a, 0x0e, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65,
- 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75,
- 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x2c, 0x0a, 0x12, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f,
- 0x75, 0x73, 0x5f, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01,
- 0x28, 0x03, 0x52, 0x10, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x4c, 0x6f, 0x63, 0x6b,
- 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x61, 0x6d,
- 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x61, 0x6d,
- 0x65, 0x22, 0x4d, 0x0a, 0x17, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54,
- 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05,
- 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x74, 0x6f, 0x6b,
- 0x65, 0x6e, 0x12, 0x1c, 0x0a, 0x0a, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x74, 0x73, 0x5f, 0x6e, 0x73,
- 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x73, 0x4e, 0x73,
- 0x22, 0x8c, 0x01, 0x0a, 0x18, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69,
- 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a,
- 0x0e, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18,
- 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x54,
- 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x2c, 0x0a, 0x12, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73,
- 0x5f, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03,
- 0x52, 0x10, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x4c, 0x6f, 0x63, 0x6b, 0x54, 0x69,
- 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18,
- 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x61, 0x6d, 0x65, 0x22,
- 0x1b, 0x0a, 0x19, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54,
- 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xf7, 0x08, 0x0a,
- 0x07, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0d, 0x53, 0x65, 0x6e, 0x64,
- 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x12, 0x14, 0x2e, 0x6d, 0x61, 0x73, 0x74,
- 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x1a,
- 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72,
- 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28,
- 0x01, 0x30, 0x01, 0x12, 0x51, 0x0a, 0x0d, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65,
- 0x63, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62,
- 0x2e, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65,
- 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70,
- 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
- 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x51, 0x0a, 0x0c, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70,
- 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f,
- 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52,
- 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f,
- 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52,
- 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3f, 0x0a, 0x06, 0x41, 0x73, 0x73,
- 0x69, 0x67, 0x6e, 0x12, 0x18, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e,
- 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e,
- 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e,
- 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0a, 0x53, 0x74,
- 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65,
- 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52,
- 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f,
- 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65,
- 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74,
- 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
- 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x61,
- 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69,
- 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
- 0x12, 0x5d, 0x0a, 0x10, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65,
- 0x6c, 0x65, 0x74, 0x65, 0x12, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62,
- 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74,
- 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65,
- 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44,
- 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
- 0x4b, 0x0a, 0x0a, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1c, 0x2e,
- 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65,
- 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x61,
- 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69,
- 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0e,
- 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x20,
- 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75,
- 0x70, 0x45, 0x63, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
- 0x1a, 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f,
- 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
- 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74,
- 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12,
- 0x28, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4d,
- 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69,
- 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x61, 0x73, 0x74,
- 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43,
- 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70,
- 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61,
- 0x73, 0x74, 0x65, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x23, 0x2e, 0x6d, 0x61,
- 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74,
- 0x65, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
- 0x1a, 0x24, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73,
+ 0x6e, 0x64, 0x73, 0x12, 0x2f, 0x0a, 0x13, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x5f, 0x72,
+ 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x12, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61,
+ 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x05,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x3b, 0x0a, 0x18,
+ 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
+ 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6c, 0x69, 0x65,
+ 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63,
+ 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0x42, 0x0a, 0x19, 0x4c, 0x69, 0x73,
0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65,
- 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5a, 0x0a, 0x0f, 0x4c, 0x65, 0x61, 0x73,
- 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x21, 0x2e, 0x6d, 0x61,
- 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d,
- 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22,
- 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x73, 0x65,
- 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
- 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41,
- 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74,
- 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d,
- 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24,
- 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61,
- 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70,
- 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
- 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73,
- 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62,
- 0x2f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
- 0x6f, 0x33,
+ 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x61,
+ 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d,
+ 0x67, 0x72, 0x70, 0x63, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x8a, 0x01,
+ 0x0a, 0x16, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65,
+ 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x72, 0x65, 0x76,
+ 0x69, 0x6f, 0x75, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03,
+ 0x52, 0x0d, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12,
+ 0x2c, 0x0a, 0x12, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x5f, 0x6c, 0x6f, 0x63, 0x6b,
+ 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x70, 0x72, 0x65,
+ 0x76, 0x69, 0x6f, 0x75, 0x73, 0x4c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1b, 0x0a,
+ 0x09, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x4d, 0x0a, 0x17, 0x4c, 0x65,
+ 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73,
+ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1c, 0x0a, 0x0a, 0x6c,
+ 0x6f, 0x63, 0x6b, 0x5f, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52,
+ 0x08, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x73, 0x4e, 0x73, 0x22, 0x8c, 0x01, 0x0a, 0x18, 0x52, 0x65,
+ 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f,
+ 0x75, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d,
+ 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x2c, 0x0a,
+ 0x12, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x5f, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x74,
+ 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x70, 0x72, 0x65, 0x76, 0x69,
+ 0x6f, 0x75, 0x73, 0x4c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6c,
+ 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08,
+ 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x1b, 0x0a, 0x19, 0x52, 0x65, 0x6c, 0x65,
+ 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73,
+ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xf7, 0x08, 0x0a, 0x07, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65,
+ 0x64, 0x12, 0x49, 0x0a, 0x0d, 0x53, 0x65, 0x6e, 0x64, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65,
+ 0x61, 0x74, 0x12, 0x14, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x48,
+ 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x1a, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65,
+ 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65,
+ 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x51, 0x0a, 0x0d,
+ 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x2e,
+ 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f,
+ 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19,
+ 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d,
+ 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12,
+ 0x51, 0x0a, 0x0c, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12,
+ 0x1e, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b,
+ 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
+ 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b,
+ 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
+ 0x22, 0x00, 0x12, 0x3f, 0x0a, 0x06, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x12, 0x18, 0x2e, 0x6d,
+ 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f,
+ 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63,
+ 0x73, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74,
+ 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
+ 0x1d, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74,
+ 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
+ 0x12, 0x57, 0x0a, 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69,
+ 0x73, 0x74, 0x12, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43,
+ 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62,
+ 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x10, 0x43, 0x6f, 0x6c,
+ 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x22, 0x2e,
+ 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63,
+ 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
+ 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f,
+ 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65,
+ 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0a, 0x56, 0x6f, 0x6c, 0x75,
+ 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f,
+ 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62,
+ 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
+ 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45,
+ 0x63, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72,
+ 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f, 0x6c, 0x75,
+ 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74,
+ 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f,
+ 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f,
+ 0x0a, 0x16, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69,
+ 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65,
+ 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f,
+ 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47,
+ 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72,
+ 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
+ 0x60, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6c, 0x69,
+ 0x65, 0x6e, 0x74, 0x73, 0x12, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62,
+ 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e,
+ 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x61, 0x73, 0x74,
+ 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72,
+ 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+ 0x00, 0x12, 0x5a, 0x0a, 0x0f, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54,
+ 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62,
+ 0x2e, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72,
+ 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f,
+ 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a,
+ 0x11, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b,
+ 0x65, 0x6e, 0x12, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52,
+ 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72,
+ 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e,
+ 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42,
+ 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68,
+ 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66,
+ 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72,
+ 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
new file mode 100644
index 000000000..9b18275b5
--- /dev/null
+++ b/weed/replication/repl_util/replication_utli.go
@@ -0,0 +1,40 @@
+package repl_util
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
+
+ for _, chunk := range chunkViews {
+
+ fileUrls, err := filerSource.LookupFileId(chunk.FileId)
+ if err != nil {
+ return err
+ }
+
+ var writeErr error
+
+ for _, fileUrl := range fileUrls {
+ err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ writeErr = writeFunc(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ } else if writeErr != nil {
+ glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
+ } else {
+ break
+ }
+ }
+
+ if err != nil {
+ return err
+ }
+
+ }
+ return nil
+}
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index dab5cf4f4..df70be64b 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"net/url"
"strings"
@@ -107,25 +108,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
return err
}
- for _, chunk := range chunkViews {
-
- fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
- if err != nil {
- return err
- }
-
- var writeErr error
- readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
- _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
- })
-
- if readErr != nil {
- return readErr
- }
- if writeErr != nil {
- return writeErr
- }
+ writeFunc := func(data []byte) error {
+ _, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
+ return writeErr
+ }
+ if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
+ return err
}
return nil
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index cf212f129..24f0ecbbc 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -2,6 +2,7 @@ package B2Sink
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -95,31 +96,18 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
targetObject := bucket.Object(key)
writer := targetObject.NewWriter(context.Background())
- for _, chunk := range chunkViews {
-
- fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
- if err != nil {
- return err
- }
-
- var writeErr error
- readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
- _, err := writer.Write(data)
- if err != nil {
- writeErr = err
- }
- })
+ writeFunc := func(data []byte) error {
+ _, writeErr := writer.Write(data)
+ return writeErr
+ }
- if readErr != nil {
- return readErr
- }
- if writeErr != nil {
- return writeErr
- }
+ defer writer.Close()
+ if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
+ return err
}
- return writer.Close()
+ return nil
}
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index c6bfa212a..badabc32c 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -3,6 +3,7 @@ package gcssink
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"os"
"cloud.google.com/go/storage"
@@ -93,25 +94,14 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
+ defer wc.Close()
- for _, chunk := range chunkViews {
-
- fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
- if err != nil {
- return err
- }
-
- err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
- wc.Write(data)
- })
-
- if err != nil {
- return err
- }
-
+ writeFunc := func(data []byte) error {
+ _, writeErr := wc.Write(data)
+ return writeErr
}
- if err := wc.Close(); err != nil {
+ if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index 8a8e7a92b..45265d1ba 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -157,11 +157,18 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou
}
func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) {
- fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId)
+ fileUrls, err := s3sink.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return nil, err
}
buf := make([]byte, chunk.Size)
- util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf)
+ for _, fileUrl := range fileUrls {
+ _, err = util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf)
+ if err != nil {
+ glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ } else {
+ break
+ }
+ }
return bytes.NewReader(buf), nil
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 9106ee98b..c3ef8835c 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -41,7 +41,7 @@ func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error)
return nil
}
-func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
+func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) {
vid2Locations := make(map[string]*filer_pb.Locations)
@@ -64,29 +64,38 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
if err != nil {
glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
- return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
+ return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
}
locations := vid2Locations[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
- return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
+ return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
- fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
+ for _, loc := range locations.Locations {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part))
+ }
return
}
func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) {
- fileUrl, err := fs.LookupFileId(part)
+ fileUrls, err := fs.LookupFileId(part)
if err != nil {
return "", nil, nil, err
}
- filename, header, resp, err = util.DownloadFile(fileUrl)
+ for _, fileUrl := range fileUrls {
+ filename, header, resp, err = util.DownloadFile(fileUrl)
+ if err != nil {
+ glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
+ } else {
+ break
+ }
+ }
return filename, header, resp, err
}
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index 31519e6e3..2b7666345 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -16,9 +16,11 @@ import (
type Action string
const (
- ACTION_READ = "Read"
- ACTION_WRITE = "Write"
- ACTION_ADMIN = "Admin"
+ ACTION_READ = "Read"
+ ACTION_WRITE = "Write"
+ ACTION_ADMIN = "Admin"
+ ACTION_TAGGING = "Tagging"
+ ACTION_LIST = "List"
)
type Iam interface {
diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go
new file mode 100644
index 000000000..3d4da7825
--- /dev/null
+++ b/weed/s3api/filer_util_tags.go
@@ -0,0 +1,104 @@
+package s3api
+
+import (
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+const (
+ S3TAG_PREFIX = "s3-"
+)
+
+func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) {
+
+ err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ })
+ if err != nil {
+ return err
+ }
+ tags = make(map[string]string)
+ for k, v := range resp.Entry.Extended {
+ if strings.HasPrefix(k, S3TAG_PREFIX) {
+ tags[k[len(S3TAG_PREFIX):]] = string(v)
+ }
+ }
+ return nil
+ })
+ return
+}
+
+func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) {
+
+ return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ })
+ if err != nil {
+ return err
+ }
+
+ for k, _ := range resp.Entry.Extended {
+ if strings.HasPrefix(k, S3TAG_PREFIX) {
+ delete(resp.Entry.Extended, k)
+ }
+ }
+
+ if resp.Entry.Extended == nil {
+ resp.Entry.Extended = make(map[string][]byte)
+ }
+ for k, v := range tags {
+ resp.Entry.Extended[S3TAG_PREFIX+k] = []byte(v)
+ }
+
+ return filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
+ Directory: parentDirectoryPath,
+ Entry: resp.Entry,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ })
+
+ })
+
+}
+
+func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) {
+
+ return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ })
+ if err != nil {
+ return err
+ }
+
+ hasDeletion := false
+ for k, _ := range resp.Entry.Extended {
+ if strings.HasPrefix(k, S3TAG_PREFIX) {
+ delete(resp.Entry.Extended, k)
+ hasDeletion = true
+ }
+ }
+
+ if !hasDeletion {
+ return nil
+ }
+
+ return filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
+ Directory: parentDirectoryPath,
+ Entry: resp.Entry,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ })
+
+ })
+
+}
diff --git a/weed/s3api/s3api_object_tagging_handlers.go b/weed/s3api/s3api_object_tagging_handlers.go
new file mode 100644
index 000000000..94719834c
--- /dev/null
+++ b/weed/s3api/s3api_object_tagging_handlers.go
@@ -0,0 +1,117 @@
+package s3api
+
+import (
+ "encoding/xml"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "io/ioutil"
+ "net/http"
+)
+
+// GetObjectTaggingHandler - GET object tagging
+// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
+func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ bucket, object := getBucketAndObject(r)
+
+ target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
+ dir, name := target.DirAndName()
+
+ tags, err := s3a.getTags(dir, name)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
+ } else {
+ glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ }
+ return
+ }
+
+ writeSuccessResponseXML(w, encodeResponse(FromTags(tags)))
+
+}
+
+// PutObjectTaggingHandler Put object tagging
+// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
+func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ bucket, object := getBucketAndObject(r)
+
+ target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
+ dir, name := target.DirAndName()
+
+ tagging := &Tagging{}
+ input, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength))
+ if err != nil {
+ glog.Errorf("PutObjectTaggingHandler read input %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ return
+ }
+ if err = xml.Unmarshal(input, tagging); err != nil {
+ glog.Errorf("PutObjectTaggingHandler Unmarshal %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrMalformedXML, r.URL)
+ return
+ }
+ tags := tagging.ToTags()
+ if len(tags) > 10 {
+ glog.Errorf("PutObjectTaggingHandler tags %s: %d tags more than 10", r.URL, len(tags))
+ writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
+ return
+ }
+ for k, v := range tags {
+ if len(k) > 128 {
+ glog.Errorf("PutObjectTaggingHandler tags %s: tag key %s longer than 128", r.URL, k)
+ writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
+ return
+ }
+ if len(v) > 256 {
+ glog.Errorf("PutObjectTaggingHandler tags %s: tag value %s longer than 256", r.URL, v)
+ writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
+ return
+ }
+ }
+
+ if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
+ } else {
+ glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ }
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+
+}
+
+// DeleteObjectTaggingHandler Delete object tagging
+// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjectTagging.html
+func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ bucket, object := getBucketAndObject(r)
+
+ target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
+ dir, name := target.DirAndName()
+
+ err := s3a.rmTags(dir, name)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
+ } else {
+ glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ }
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 5ddfdafd0..1ab80c3ee 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -64,9 +64,16 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// AbortMultipartUpload
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.AbortMultipartUploadHandler, ACTION_WRITE), "DELETE")).Queries("uploadId", "{uploadId:.*}")
// ListObjectParts
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_WRITE), "GET")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_READ), "GET")).Queries("uploadId", "{uploadId:.*}")
// ListMultipartUploads
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_WRITE), "GET")).Queries("uploads", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_READ), "GET")).Queries("uploads", "")
+
+ // GetObjectTagging
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectTaggingHandler, ACTION_READ), "GET")).Queries("tagging", "")
+ // PutObjectTagging
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectTaggingHandler, ACTION_TAGGING), "PUT")).Queries("tagging", "")
+ // DeleteObjectTagging
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "")
// CopyObject
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY"))
@@ -81,11 +88,11 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketHandler, ACTION_WRITE), "DELETE"))
// ListObjectsV2
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_READ), "LIST")).Queries("list-type", "2")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_LIST), "LIST")).Queries("list-type", "2")
// GetObject, but directory listing is not supported
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectHandler, ACTION_READ), "GET"))
// ListObjectsV1 (Legacy)
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_READ), "LIST"))
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_LIST), "LIST"))
// PostPolicy
bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.PostPolicyBucketHandler, ACTION_WRITE), "POST"))
@@ -112,7 +119,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
}
// ListBuckets
- apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_READ), "LIST"))
+ apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_ADMIN), "LIST"))
// NotFound
apiRouter.NotFoundHandler = http.HandlerFunc(notFoundHandler)
diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go
index cccef0227..f95652afb 100644
--- a/weed/s3api/s3err/s3api_errors.go
+++ b/weed/s3api/s3err/s3api_errors.go
@@ -61,6 +61,7 @@ const (
ErrInternalError
ErrInvalidCopyDest
ErrInvalidCopySource
+ ErrInvalidTag
ErrAuthHeaderEmpty
ErrSignatureVersionNotSupported
ErrMalformedPOSTRequest
@@ -188,6 +189,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.",
HTTPStatusCode: http.StatusBadRequest,
},
+ ErrInvalidTag: {
+ Code: "InvalidArgument",
+ Description: "The Tag value you have provided is invalid",
+ HTTPStatusCode: http.StatusBadRequest,
+ },
ErrMalformedXML: {
Code: "MalformedXML",
Description: "The XML you provided was not well-formed or did not validate against our published schema.",
diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go
index 16a546c66..b667b32a0 100644
--- a/weed/s3api/stats.go
+++ b/weed/s3api/stats.go
@@ -4,18 +4,35 @@ import (
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
+ "strconv"
"time"
)
-func track(f http.HandlerFunc, action string) http.HandlerFunc {
+type StatusRecorder struct {
+ http.ResponseWriter
+ Status int
+}
- return func(w http.ResponseWriter, r *http.Request) {
+func NewStatusResponseWriter(w http.ResponseWriter) *StatusRecorder {
+ return &StatusRecorder{w, http.StatusOK}
+}
- w.Header().Set("Server", "SeaweedFS S3 "+util.VERSION)
+func (r *StatusRecorder) WriteHeader(status int) {
+ r.Status = status
+ r.ResponseWriter.WriteHeader(status)
+}
+func (r *StatusRecorder) Flush() {
+ r.ResponseWriter.(http.Flusher).Flush()
+}
+
+func track(f http.HandlerFunc, action string) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS S3 "+util.VERSION)
+ recorder := NewStatusResponseWriter(w)
start := time.Now()
- stats_collect.S3RequestCounter.WithLabelValues(action).Inc()
- f(w, r)
+ f(recorder, r)
stats_collect.S3RequestHistogram.WithLabelValues(action).Observe(time.Since(start).Seconds())
+ stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status)).Inc()
}
}
diff --git a/weed/s3api/tags.go b/weed/s3api/tags.go
new file mode 100644
index 000000000..9ff7d1fba
--- /dev/null
+++ b/weed/s3api/tags.go
@@ -0,0 +1,38 @@
+package s3api
+
+import (
+ "encoding/xml"
+)
+
+type Tag struct {
+ Key string `xml:"Key"`
+ Value string `xml:"Value"`
+}
+
+type TagSet struct {
+ Tag []Tag `xml:"Tag"`
+}
+
+type Tagging struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Tagging"`
+ TagSet TagSet `xml:"TagSet"`
+}
+
+func (t *Tagging) ToTags() map[string]string {
+ output := make(map[string]string)
+ for _, tag := range t.TagSet.Tag {
+ output[tag.Key] = tag.Value
+ }
+ return output
+}
+
+func FromTags(tags map[string]string) (t *Tagging) {
+ t = &Tagging{}
+ for k, v := range tags {
+ t.TagSet.Tag = append(t.TagSet.Tag, Tag{
+ Key: k,
+ Value: v,
+ })
+ }
+ return
+}
diff --git a/weed/s3api/tags_test.go b/weed/s3api/tags_test.go
new file mode 100644
index 000000000..887843d6f
--- /dev/null
+++ b/weed/s3api/tags_test.go
@@ -0,0 +1,50 @@
+package s3api
+
+import (
+ "encoding/xml"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestXMLUnmarshall(t *testing.T) {
+
+ input := `<?xml version="1.0" encoding="UTF-8"?>
+<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <TagSet>
+ <Tag>
+ <Key>key1</Key>
+ <Value>value1</Value>
+ </Tag>
+ </TagSet>
+</Tagging>
+`
+
+ tags := &Tagging{}
+
+ xml.Unmarshal([]byte(input), tags)
+
+ assert.Equal(t, len(tags.TagSet.Tag), 1)
+ assert.Equal(t, tags.TagSet.Tag[0].Key, "key1")
+ assert.Equal(t, tags.TagSet.Tag[0].Value, "value1")
+
+}
+
+func TestXMLMarshall(t *testing.T) {
+ tags := &Tagging{
+ TagSet: TagSet{
+ []Tag{
+ {
+ Key: "key1",
+ Value: "value1",
+ },
+ },
+ },
+ }
+
+ actual := string(encodeResponse(tags))
+
+ expected := `<?xml version="1.0" encoding="UTF-8"?>
+<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><TagSet><Tag><Key>key1</Key><Value>value1</Value></Tag></TagSet></Tagging>`
+ assert.Equal(t, expected, actual)
+
+}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index ecd23413f..943dbd2a2 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -32,12 +32,12 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
return &filer_pb.LookupDirectoryEntryResponse{
Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: entry.IsDirectory(),
- Attributes: filer.EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- HardLinkId: entry.HardLinkId,
+ Name: req.Name,
+ IsDirectory: entry.IsDirectory(),
+ Attributes: filer.EntryAttributeToPb(entry),
+ Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
HardLinkCounter: entry.HardLinkCounter,
},
}, nil
@@ -77,12 +77,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
if err := stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer.EntryAttributeToPb(entry),
- Extended: entry.Extended,
- HardLinkId: entry.HardLinkId,
+ Name: entry.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Chunks: entry.Chunks,
+ Attributes: filer.EntryAttributeToPb(entry),
+ Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
HardLinkCounter: entry.HardLinkCounter,
},
}); err != nil {
@@ -135,16 +135,19 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
-func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) {
+func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
- return "", err
+ return nil, err
}
locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
if !found || len(locations) == 0 {
- return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ }
+ for _, loc := range locations {
+ targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
}
- return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil
+ return
}
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
@@ -159,11 +162,11 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
}
createErr := fs.filer.CreateEntry(ctx, &filer.Entry{
- FullPath: util.JoinPath(req.Directory, req.Entry.Name),
- Attr: filer.PbToEntryAttribute(req.Entry.Attributes),
- Chunks: chunks,
- Extended: req.Entry.Extended,
- HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
+ Attr: filer.PbToEntryAttribute(req.Entry.Attributes),
+ Chunks: chunks,
+ Extended: req.Entry.Extended,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
HardLinkCounter: req.Entry.HardLinkCounter,
}, req.OExcl, req.IsFromOtherCluster, req.Signatures)
@@ -193,11 +196,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
newEntry := &filer.Entry{
- FullPath: util.JoinPath(req.Directory, req.Entry.Name),
- Attr: entry.Attr,
- Extended: req.Entry.Extended,
- Chunks: chunks,
- HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
+ Attr: entry.Attr,
+ Extended: req.Entry.Extended,
+ Chunks: chunks,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
HardLinkCounter: req.Entry.HardLinkCounter,
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index ec0a4fb3e..59c149cef 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
"os"
"sync"
@@ -15,7 +16,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -92,8 +92,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
})
fs.filer.Cipher = option.Cipher
- fs.maybeStartMetrics()
+ fs.checkWithMaster()
+ go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepConnectedToMaster()
v := util.GetViper()
@@ -135,7 +136,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
return fs, nil
}
-func (fs *FilerServer) maybeStartMetrics() {
+func (fs *FilerServer) checkWithMaster() {
for _, master := range fs.option.Masters {
_, err := pb.ParseFilerGrpcAddress(master)
@@ -145,10 +146,19 @@ func (fs *FilerServer) maybeStartMetrics() {
}
isConnected := false
- var readErr error
for !isConnected {
for _, master := range fs.option.Masters {
- fs.metricsAddress, fs.metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
+ readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ if fs.option.DefaultReplication == "" {
+ fs.option.DefaultReplication = resp.DefaultReplication
+ }
+ return nil
+ })
if readErr == nil {
isConnected = true
} else {
@@ -157,17 +167,4 @@ func (fs *FilerServer) maybeStartMetrics() {
}
}
- go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
-}
-
-func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
- err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
- resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
- }
- metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
- return nil
- })
- return
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 61011fc20..2b37e3c5d 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -167,6 +167,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
TtlSec: ttlSec,
Mime: contentType,
Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
},
Chunks: fileChunks,
}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 692909a29..e8fa3995d 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"net"
"strings"
"time"
@@ -302,3 +303,19 @@ func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.Li
}
return resp, nil
}
+
+func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
+
+ // tell the volume servers about the leader
+ leader, _ := ms.Topo.Leader()
+
+ resp := &master_pb.GetMasterConfigurationResponse{
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
+ DefaultReplication: ms.option.DefaultReplicaPlacement,
+ Leader: leader,
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 168975fb6..03b718291 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,8 +3,6 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
-
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -179,14 +177,3 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
return resp, nil
}
-
-func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
-
- resp := &master_pb.GetMasterConfigurationResponse{
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
- StorageBackends: backend.ToPbStorageBackends(),
- }
-
- return resp, nil
-}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 657b170c2..cc1c4b2ad 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -138,14 +138,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("event: %+v", e)
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
- ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("state change: %+v", e)
- })
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 958680d2b..073c1ff16 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,10 +2,8 @@ package weed_server
import (
"encoding/json"
- "io/ioutil"
"os"
"path"
- "reflect"
"sort"
"time"
@@ -28,7 +26,31 @@ type RaftServer struct {
*raft.GrpcServer
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+type StateMachine struct {
+ raft.StateMachine
+ topo *topology.Topology
+}
+
+func (s StateMachine) Save() ([]byte, error) {
+ state := topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }
+ glog.V(1).Infof("Save raft state %+v", state)
+ return json.Marshal(state)
+}
+
+func (s StateMachine) Recovery(data []byte) error {
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(data, &state)
+ if err != nil {
+ return err
+ }
+ glog.V(1).Infof("Recovery raft state %+v", state)
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+ return nil
+}
+
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int, raftResumeState bool) (*RaftServer, error) {
s := &RaftServer{
peers: peers,
serverAddr: serverAddr,
@@ -46,48 +68,66 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
transporter := raft.NewGrpcTransporter(grpcDialOption)
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
- // always clear previous metadata
- os.RemoveAll(path.Join(s.dataDir, "conf"))
- os.RemoveAll(path.Join(s.dataDir, "log"))
- os.RemoveAll(path.Join(s.dataDir, "snapshot"))
- // Clear old cluster configurations if peers are changed
- if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
- glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
+ if !raftResumeState {
+ // always clear previous metadata
+ os.RemoveAll(path.Join(s.dataDir, "conf"))
+ os.RemoveAll(path.Join(s.dataDir, "log"))
+ os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ }
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ return nil, err
}
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
+ stateMachine := StateMachine{topo: topo}
+ s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
glog.V(0).Infoln(err)
- return nil
+ return nil, err
}
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
- s.raftServer.Start()
+ if err := s.raftServer.LoadSnapshot(); err != nil {
+ return nil, err
+ }
+ if err := s.raftServer.Start(); err != nil {
+ return nil, err
+ }
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
+ if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ return nil, err
+ }
+ }
+
+ // Remove deleted peers
+ for existsPeerName := range s.raftServer.Peers() {
+ exists, existingPeer := false, ""
+ for _, peer := range s.peers {
+ if pb.ServerToGrpcAddress(peer) == existsPeerName {
+ exists, existingPeer = true, peer
+ break
+ }
+ }
+ if exists {
+ if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
+ glog.V(0).Infoln(err)
+ return nil, err
+ } else {
+ glog.V(0).Infof("removing old peer %s", existingPeer)
+ }
+ }
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
- glog.V(0).Infoln("Initializing new cluster")
-
- _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
- Name: s.raftServer.Name(),
- ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
- })
-
- if err != nil {
- glog.V(0).Infoln(err)
- return nil
- }
+ // s.DoJoinCommand()
}
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
- return s
+ return s, nil
}
func (s *RaftServer) Peers() (members []string) {
@@ -100,34 +140,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) {
- confPath := path.Join(dir, "conf")
- // open conf file
- b, err := ioutil.ReadFile(confPath)
- if err != nil {
- return oldPeers, true
- }
- conf := &raft.Config{}
- if err = json.Unmarshal(b, conf); err != nil {
- return oldPeers, true
- }
-
- for _, p := range conf.Peers {
- oldPeers = append(oldPeers, p.Name)
- }
- oldPeers = append(oldPeers, self)
-
- if len(peers) == 0 && len(oldPeers) <= 1 {
- return oldPeers, false
- }
-
- sort.Strings(peers)
- sort.Strings(oldPeers)
-
- return oldPeers, !reflect.DeepEqual(peers, oldPeers)
-
-}
-
func isTheFirstOne(self string, peers []string) bool {
sort.Strings(peers)
if len(peers) <= 0 {
@@ -135,3 +147,16 @@ func isTheFirstOne(self string, peers []string) bool {
}
return self == peers[0]
}
+
+func (s *RaftServer) DoJoinCommand() {
+
+ glog.V(0).Infoln("Initializing new cluster")
+
+ if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
+ Name: s.raftServer.Name(),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ }); err != nil {
+ glog.Errorf("fail to send join command: %v", err)
+ }
+
+}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index fd38cb977..252570eab 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,20 +1,24 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"net/http"
)
type ClusterStatusResult struct {
- IsLeader bool `json:"IsLeader,omitempty"`
- Leader string `json:"Leader,omitempty"`
- Peers []string `json:"Peers,omitempty"`
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader string `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
+ MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"`
}
func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
ret := ClusterStatusResult{
- IsLeader: s.topo.IsLeader(),
- Peers: s.Peers(),
+ IsLeader: s.topo.IsLeader(),
+ Peers: s.Peers(),
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
}
+
if leader, e := s.topo.Leader(); e == nil {
ret.Leader = leader
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 8698a4c64..199f8faba 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -90,7 +90,7 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
return true
}
vs.isHeartbeating = false
- vs.stopChan <- true
+ close(vs.stopChan)
return false
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index bb04678d6..15fd446e7 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -93,6 +93,10 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
}
+ if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume {
+ glog.V(4).Infof("read needle: %v", err)
+ // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request
+ }
// glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 78cbf08c5..01a77b901 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@@ -67,7 +68,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret.Name = string(reqNeedle.Name)
}
ret.Size = uint32(originalSize)
- ret.ETag = reqNeedle.Etag()
+ ret.ETag = fmt.Sprintf("%x", util.Base64Md5ToBytes(contentMd5))
ret.Mime = string(reqNeedle.Mime)
setEtag(w, ret.ETag)
w.Header().Set("Content-MD5", contentMd5)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 121c0d2bb..f13e73a7b 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -100,7 +100,7 @@ type WebDavFile struct {
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
- chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB)
+ chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024)
return &WebDavFileSystem{
option: option,
chunkCache: chunkCache,
diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go
index d930caf0f..a60cda290 100644
--- a/weed/stats/metrics.go
+++ b/weed/stats/metrics.go
@@ -16,7 +16,7 @@ import (
)
var (
- Gather = prometheus.NewRegistry()
+ Gather = prometheus.NewRegistry()
FilerRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
@@ -99,7 +99,7 @@ var (
Subsystem: "s3",
Name: "request_total",
Help: "Counter of s3 requests.",
- }, []string{"type"})
+ }, []string{"type", "code"})
S3RequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "SeaweedFS",
diff --git a/weed/storage/backend/volume_create_linux.go b/weed/storage/backend/volume_create_linux.go
index 4602831ca..260c2c2a3 100644
--- a/weed/storage/backend/volume_create_linux.go
+++ b/weed/storage/backend/volume_create_linux.go
@@ -16,7 +16,7 @@ func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32
}
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
- glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
+ glog.V(1).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
return NewDiskFile(file), nil
}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 9ecc57459..c309b3f92 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -174,9 +174,6 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
}
func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) {
- l.volumesLock.Lock()
- defer l.volumesLock.Unlock()
-
v, ok := l.volumes[vid]
if !ok {
return
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 48cbeb3d1..d5d59235a 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -380,10 +380,12 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
Ttl: v.Ttl.ToUint32(),
}
for _, location := range s.Locations {
- if found, err := location.deleteVolumeById(i); found && err == nil {
+ if err := location.DeleteVolume(i); err == nil {
glog.V(0).Infof("DeleteVolume %d", i)
s.DeletedVolumesChan <- message
return nil
+ } else {
+ glog.Errorf("DeleteVolume %d: %v", i, err)
}
}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index bd7bdacbd..853757ce3 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -128,7 +128,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, fmt.Errorf("locate in local ec volume: %v", err)
}
if size.IsDeleted() {
- return 0, fmt.Errorf("entry %s is deleted", n.Id)
+ return 0, ErrorDeleted
}
glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
@@ -141,7 +141,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
}
if isDeleted {
- return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
+ return 0, ErrorDeleted
}
err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version)
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index e11bde2cb..10c87c8ea 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -16,6 +16,7 @@ import (
)
var ErrorNotFound = errors.New("not found")
+var ErrorDeleted = errors.New("already deleted")
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
@@ -266,7 +267,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
glog.V(3).Infof("reading deleted %s", n.String())
readSize = -readSize
} else {
- return -1, errors.New("already deleted")
+ return -1, ErrorDeleted
}
}
if readSize == 0 {
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index 2b0c635a1..3615aee0e 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -7,12 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
-const (
- memCacheSizeLimit = 1024 * 1024
- onDiskCacheSizeLimit0 = memCacheSizeLimit
- onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit
-)
-
type ChunkCache interface {
GetChunk(fileId string, minSize uint64) (data []byte)
SetChunk(fileId string, data []byte)
@@ -23,17 +17,23 @@ type TieredChunkCache struct {
memCache *ChunkCacheInMemory
diskCaches []*OnDiskCacheLayer
sync.RWMutex
+ onDiskCacheSizeLimit0 uint64
+ onDiskCacheSizeLimit1 uint64
+ onDiskCacheSizeLimit2 uint64
}
-func NewTieredChunkCache(maxEntries int64, dir string, diskSizeMB int64) *TieredChunkCache {
+func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
c := &TieredChunkCache{
memCache: NewChunkCacheInMemory(maxEntries),
}
c.diskCaches = make([]*OnDiskCacheLayer, 3)
- c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4)
- c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4)
- c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4)
+ c.onDiskCacheSizeLimit0 = uint64(unitSize)
+ c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
+ c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
+ c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
+ c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
+ c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
return c
}
@@ -51,7 +51,7 @@ func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte)
func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) {
- if minSize < memCacheSizeLimit {
+ if minSize <= c.onDiskCacheSizeLimit0 {
data = c.memCache.GetChunk(fileId)
if len(data) >= int(minSize) {
return data
@@ -64,13 +64,13 @@ func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byt
return nil
}
- if minSize < onDiskCacheSizeLimit0 {
+ if minSize <= c.onDiskCacheSizeLimit0 {
data = c.diskCaches[0].getChunk(fid.Key)
if len(data) >= int(minSize) {
return data
}
}
- if minSize < onDiskCacheSizeLimit1 {
+ if minSize <= c.onDiskCacheSizeLimit1 {
data = c.diskCaches[1].getChunk(fid.Key)
if len(data) >= int(minSize) {
return data
@@ -101,7 +101,7 @@ func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
- if len(data) < memCacheSizeLimit {
+ if len(data) <= int(c.onDiskCacheSizeLimit0) {
c.memCache.SetChunk(fileId, data)
}
@@ -111,9 +111,9 @@ func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
return
}
- if len(data) < onDiskCacheSizeLimit0 {
+ if len(data) <= int(c.onDiskCacheSizeLimit0) {
c.diskCaches[0].setChunk(fid.Key, data)
- } else if len(data) < onDiskCacheSizeLimit1 {
+ } else if len(data) <= int(c.onDiskCacheSizeLimit1) {
c.diskCaches[1].setChunk(fid.Key, data)
} else {
c.diskCaches[2].setChunk(fid.Key, data)
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
index 558488f18..f8325276e 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -14,9 +14,9 @@ func TestOnDisk(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "c")
defer os.RemoveAll(tmpDir)
- totalDiskSizeMb := int64(32)
+ totalDiskSizeInKB := int64(32)
- cache := NewTieredChunkCache(0, tmpDir, totalDiskSizeMb)
+ cache := NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024)
writeCount := 5
type test_data struct {
@@ -26,7 +26,7 @@ func TestOnDisk(t *testing.T) {
}
testData := make([]*test_data, writeCount)
for i := 0; i < writeCount; i++ {
- buff := make([]byte, 1024*1024)
+ buff := make([]byte, 1024)
rand.Read(buff)
testData[i] = &test_data{
data: buff,
@@ -34,9 +34,22 @@ func TestOnDisk(t *testing.T) {
size: uint64(len(buff)),
}
cache.SetChunk(testData[i].fileId, testData[i].data)
+
+ // read back right after write
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ if bytes.Compare(data, testData[i].data) != 0 {
+ t.Errorf("failed to write to and read from cache: %d", i)
+ }
}
- for i := 0; i < writeCount; i++ {
+ for i := 0; i < 2; i++ {
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ if bytes.Compare(data, testData[i].data) == 0 {
+ t.Errorf("old cache should have been purged: %d", i)
+ }
+ }
+
+ for i := 2; i < writeCount; i++ {
data := cache.GetChunk(testData[i].fileId, testData[i].size)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
@@ -45,9 +58,35 @@ func TestOnDisk(t *testing.T) {
cache.Shutdown()
- cache = NewTieredChunkCache(0, tmpDir, totalDiskSizeMb)
+ cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024)
- for i := 0; i < writeCount; i++ {
+ for i := 0; i < 2; i++ {
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ if bytes.Compare(data, testData[i].data) == 0 {
+ t.Errorf("old cache should have been purged: %d", i)
+ }
+ }
+
+ for i := 2; i < writeCount; i++ {
+ if i == 4 {
+ // FIXME this failed many times on build machines
+ /*
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_0.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_1.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_0.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_1.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_2.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 8192 bytes disk space for /tmp/c578652251/c2_2_0.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 8192 bytes disk space for /tmp/c578652251/c2_2_1.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_0.dat
+ I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_1.dat
+ --- FAIL: TestOnDisk (0.19s)
+ chunk_cache_on_disk_test.go:73: failed to write to and read from cache: 4
+ FAIL
+ FAIL github.com/chrislusf/seaweedfs/weed/util/chunk_cache 0.199s
+ */
+ continue
+ }
data := cache.GetChunk(testData[i].fileId, testData[i].size)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go
index c3192b548..eebd89798 100644
--- a/weed/util/chunk_cache/on_disk_cache_layer.go
+++ b/weed/util/chunk_cache/on_disk_cache_layer.go
@@ -14,17 +14,17 @@ type OnDiskCacheLayer struct {
diskCaches []*ChunkCacheVolume
}
-func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer {
+func NewOnDiskCacheLayer(dir, namePrefix string, diskSize int64, segmentCount int) *OnDiskCacheLayer {
- volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
+ volumeCount, volumeSize := int(diskSize/(30000*1024*1024)), int64(30000*1024*1024)
if volumeCount < segmentCount {
- volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
+ volumeCount, volumeSize = segmentCount, diskSize/int64(segmentCount)
}
c := &OnDiskCacheLayer{}
for i := 0; i < volumeCount; i++ {
fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i))
- diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
+ diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize)
if err != nil {
glog.Errorf("failed to add cache %s : %v", fileName, err)
} else {
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 6734af7d4..0f3fd52c7 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 00)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 03)
COMMIT = ""
)
diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go
index 4ce8a2f90..f2119707e 100644
--- a/weed/util/fullpath.go
+++ b/weed/util/fullpath.go
@@ -13,6 +13,7 @@ func NewFullPath(dir, name string) FullPath {
func (fp FullPath) DirAndName() (string, string) {
dir, name := filepath.Split(string(fp))
+ name = strings.ToValidUTF8(name, "?")
if dir == "/" {
return dir, name
}
@@ -24,6 +25,7 @@ func (fp FullPath) DirAndName() (string, string) {
func (fp FullPath) Name() string {
_, name := filepath.Split(string(fp))
+ name = strings.ToValidUTF8(name, "?")
return name
}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 3d23d8f13..7b0f73ce9 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -52,6 +52,32 @@ func (mc *MasterClient) KeepConnectedToMaster() {
}
}
+func (mc *MasterClient) FindLeader(myMasterAddress string) (leader string) {
+ for _, master := range mc.masters {
+ if master == myMasterAddress {
+ continue
+ }
+ if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
+ defer cancel()
+ resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ leader = resp.Leader
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", master, grpcErr)
+ }
+ if leader != "" {
+ glog.V(0).Infof("existing leader is %s", leader)
+ return
+ }
+ }
+ glog.V(0).Infof("No existing leader found!")
+ return
+}
+
func (mc *MasterClient) tryAllMasters() {
nextHintedLeader := ""
for _, master := range mc.masters {
@@ -75,7 +101,7 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
stream, err := client.KeepConnected(ctx)
if err != nil {
- glog.V(0).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
+ glog.V(1).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
return err
}
@@ -118,7 +144,7 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
})
if gprcErr != nil {
- glog.V(0).Infof("%s masterClient failed to connect with master %v: %v", mc.clientType, master, gprcErr)
+ glog.V(1).Infof("%s masterClient failed to connect with master %v: %v", mc.clientType, master, gprcErr)
}
return
}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 97df49cb6..cee2da6e1 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -44,38 +44,36 @@ func (vc *vidMap) getLocationIndex(length int) (int, error) {
return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil
}
-func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
+func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
- return "", err
+ return nil, err
}
- return vc.GetRandomLocation(uint32(id))
-}
-
-func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) {
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return "", errors.New("Invalid fileId " + fileId)
+ locations, found := vc.GetLocations(uint32(id))
+ if !found {
+ return nil, fmt.Errorf("volume %d not found", id)
}
- serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
- if lookupError != nil {
- return "", lookupError
+ for _, loc := range locations {
+ serverUrls = append(serverUrls, loc.Url)
}
- return "http://" + serverUrl + "/" + fileId, nil
+ return
}
-func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err error) {
+func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
- return "", errors.New("Invalid fileId " + fileId)
+ return nil, errors.New("Invalid fileId " + fileId)
}
- serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
+ serverUrls, lookupError := vc.LookupVolumeServerUrl(parts[0])
if lookupError != nil {
- return "", lookupError
+ return nil, lookupError
}
- return serverUrl, nil
+ for _, serverUrl := range serverUrls {
+ fullUrls = append(fullUrls, "http://"+serverUrl+"/"+fileId)
+ }
+ return
}
func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) {
@@ -99,23 +97,6 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
return
}
-func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) {
- vc.RLock()
- defer vc.RUnlock()
-
- locations := vc.vid2Locations[vid]
- if len(locations) == 0 {
- return "", fmt.Errorf("volume %d not found", vid)
- }
-
- index, err := vc.getLocationIndex(len(locations))
- if err != nil {
- return "", fmt.Errorf("volume %d: %v", vid, err)
- }
-
- return locations[index].Url, nil
-}
-
func (vc *vidMap) addLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()