aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java2
-rw-r--r--weed/filer/read_write.go53
-rw-r--r--weed/filer/s3iam_conf.go3
-rw-r--r--weed/filer/s3iam_conf_test.go2
-rw-r--r--weed/iamapi/iamapi_server.go4
-rw-r--r--weed/s3api/auth_credentials.go10
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go29
-rw-r--r--weed/server/filer_server_handlers_write_upload.go2
-rw-r--r--weed/shell/command_fs_configure.go4
-rw-r--r--weed/shell/command_s3_configure.go2
-rw-r--r--weed/shell/command_volume_fsck.go4
-rw-r--r--weed/wdclient/exclusive_locks/exclusive_locker.go2
12 files changed, 54 insertions, 63 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
index 51053becd..19ae78277 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -10,7 +10,7 @@ import java.util.List;
public class ByteBufferPool {
- private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024;
+ private static final int MIN_BUFFER_SIZE = 1 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
private static final List<ByteBuffer> bufferList = new ArrayList<>();
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index c4c90fb63..14e8cab1e 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -2,13 +2,9 @@ package filer
import (
"bytes"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "io/ioutil"
"math"
- "net/http"
"time"
)
@@ -31,50 +27,17 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed
}
-func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
-
- target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
-
- data, _, err := util.Get(target)
-
- return data, err
-}
-
-func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error {
- var target string
- if port == 0 {
- target = fmt.Sprintf("http://%s%s/%s", host, dir, name)
- } else {
- target = fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name)
- }
-
- // set the HTTP method, url, and request body
- req, err := http.NewRequest(http.MethodPut, target, byteBuffer)
- if err != nil {
- return err
- }
-
- // set the request header Content-Type for json
- if contentType != "" {
- req.Header.Set("Content-Type", contentType)
- }
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return err
+func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) {
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: dir,
+ Name: name,
}
- defer util.CloseResponse(resp)
-
- b, err := ioutil.ReadAll(resp.Body)
+ respLookupEntry, err := filer_pb.LookupEntry(filerClient, request)
if err != nil {
- return err
- }
-
- if resp.StatusCode >= 400 {
- return fmt.Errorf("%s: %s %v", target, resp.Status, string(b))
+ return
}
-
- return nil
-
+ content = respLookupEntry.Entry.Content
+ return
}
func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error {
diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go
index 92387fb09..55c976915 100644
--- a/weed/filer/s3iam_conf.go
+++ b/weed/filer/s3iam_conf.go
@@ -4,6 +4,7 @@ import (
"bytes"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
"github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
"io"
)
@@ -14,7 +15,7 @@ func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfigura
return nil
}
-func S3ConfigurationToText(writer io.Writer, config *iam_pb.S3ApiConfiguration) error {
+func ProtoToText(writer io.Writer, config proto.Message) error {
m := jsonpb.Marshaler{
EmitDefaults: false,
diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go
index 65cc49840..da7d9c9f1 100644
--- a/weed/filer/s3iam_conf_test.go
+++ b/weed/filer/s3iam_conf_test.go
@@ -44,7 +44,7 @@ func TestS3Conf(t *testing.T) {
},
}
var buf bytes.Buffer
- err := S3ConfigurationToText(&buf, s3Conf)
+ err := ProtoToText(&buf, s3Conf)
assert.Equal(t, err, nil)
s3ConfSaved := &iam_pb.S3ApiConfiguration{}
err = ParseS3ConfigurationFromBytes(buf.Bytes(), s3ConfSaved)
diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go
index eb18e996d..037594165 100644
--- a/weed/iamapi/iamapi_server.go
+++ b/weed/iamapi/iamapi_server.go
@@ -96,8 +96,8 @@ func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat
func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
buf := bytes.Buffer{}
- if err := filer.S3ConfigurationToText(&buf, s3cfg); err != nil {
- return fmt.Errorf("S3ConfigurationToText: %s", err)
+ if err := filer.ProtoToText(&buf, s3cfg); err != nil {
+ return fmt.Errorf("ProtoToText: %s", err)
}
return pb.WithGrpcFilerClient(
iam.option.FilerGrpcAddress,
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index 3439b40df..44c3f7aa7 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -4,6 +4,8 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
@@ -51,8 +53,12 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
return iam
}
-func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error {
- content, err := filer.ReadContent(option.Filer, filer.IamConfigDirecotry, filer.IamIdentityFile)
+func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) {
+ var content []byte
+ err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile)
+ return err
+ })
if err != nil {
return fmt.Errorf("read S3 config: %v", err)
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 624069b7e..3fdac1b26 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,7 +2,6 @@ package weed_server
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"strings"
"time"
@@ -12,6 +11,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+const (
+ // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
+ MaxUnsyncedEvents = 1e3
)
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
@@ -25,7 +30,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
@@ -87,7 +92,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
@@ -152,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
}
}
-func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ filtered := 0
+
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ defer func() {
+ if filtered > MaxUnsyncedEvents {
+ if err := stream.Send(&filer_pb.SubscribeMetadataResponse{
+ EventNotification: &filer_pb.EventNotification{},
+ TsNs: tsNs,
+ }); err == nil {
+ filtered = 0
+ }
+ }
+ }()
+ filtered++
foundSelf := false
for _, sig := range eventNotification.Signatures {
- if sig == clientSignature && clientSignature != 0 {
+ if sig == req.Signature && req.Signature != 0 {
return nil
}
if sig == fs.filer.Signature {
@@ -204,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
glog.V(0).Infof("=> client %v: %+v", clientName, err)
return err
}
+ filtered = 0
return nil
}
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index e28309c6a..395852517 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -70,7 +70,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
return nil, md5Hash, 0, err, nil
}
if chunkOffset == 0 && !isAppend(r) {
- if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 {
+ if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) {
chunkOffset += dataSize
smallContent = make([]byte, dataSize)
bytesBuffer.Read(smallContent)
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index 52fcae1c6..a7e601bec 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -120,7 +120,9 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.DirectoryEtcSeaweedFS, filer.FilerConfName, "text/plain; charset=utf-8", &buf); err != nil {
+ if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
+ }); err != nil && err != filer_pb.ErrNotFound {
return err
}
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index ca51ef72f..5eab2ebd0 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -164,7 +164,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
}
buf.Reset()
- filer.S3ConfigurationToText(&buf, s3cfg)
+ filer.ProtoToText(&buf, s3cfg)
fmt.Fprintf(writer, string(buf.Bytes()))
fmt.Fprintln(writer)
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 64389fdb5..0b40cfb7c 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -206,7 +206,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
if verbose {
for _, fid := range orphanFileIds {
- fmt.Fprintf(writer, "%sxxxxxxxx\n", fid)
+ fmt.Fprintf(writer, "%s\n", fid)
}
}
@@ -410,7 +410,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri
var orphanFileCount uint64
db.AscendingVisit(func(n needle_map.NeedleValue) error {
// fmt.Printf("%d,%x\n", volumeId, n.Key)
- orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String()))
+ orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
orphanFileCount++
orphanDataSize += uint64(n.Size)
return nil
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
index 5b5fa2704..0fa138496 100644
--- a/weed/wdclient/exclusive_locks/exclusive_locker.go
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -18,10 +18,10 @@ const (
)
type ExclusiveLocker struct {
- masterClient *wdclient.MasterClient
token int64
lockTsNs int64
isLocking bool
+ masterClient *wdclient.MasterClient
}
func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {