aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-12 14:03:07 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-12 14:03:07 -0700
commitd30483d642b6115d32a7d373a33538da25e45f28 (patch)
treedbfa045fedd5c066287e0a07017698e78a7d47a5
parent6f948e48879ed0706b044bea0429d3fc48d6e8e1 (diff)
downloadseaweedfs-d30483d642b6115d32a7d373a33538da25e45f28.tar.xz
seaweedfs-d30483d642b6115d32a7d373a33538da25e45f28.zip
re-enable system logs
-rw-r--r--unmaintained/see_log_entry/see_log_entry.go3
-rw-r--r--weed/filer2/filer.go6
-rw-r--r--weed/filer2/filer_notify.go9
-rw-r--r--weed/filer2/filer_notify_append.go47
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go4
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go4
-rw-r--r--weed/filer2/topics.go6
-rw-r--r--weed/server/filer_grpc_server_listen.go3
-rw-r--r--weed/server/filer_server.go2
9 files changed, 53 insertions, 31 deletions
diff --git a/unmaintained/see_log_entry/see_log_entry.go b/unmaintained/see_log_entry/see_log_entry.go
index b7d724344..69503b65d 100644
--- a/unmaintained/see_log_entry/see_log_entry.go
+++ b/unmaintained/see_log_entry/see_log_entry.go
@@ -9,12 +9,13 @@ import (
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
- logdataFile = flag.String("logdata", "", "log data file saved under /.meta/log/...")
+ logdataFile = flag.String("logdata", "", "log data file saved under "+ filer2.SystemLogDir)
)
func main() {
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index df6379d26..acd609847 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -36,9 +36,11 @@ type Filer struct {
buckets *FilerBuckets
Cipher bool
metaLogBuffer *log_buffer.LogBuffer
+ metaLogCollection string
+ metaLogReplication string
}
-func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, notifyFn func()) *Filer {
+func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters),
@@ -46,6 +48,8 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort ui
GrpcDialOption: grpcDialOption,
}
f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
+ f.metaLogCollection = collection
+ f.metaLogReplication = replication
go f.loopProcessingDeletion()
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index de07e1cf9..18f96658b 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -25,7 +25,7 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
// println("fullpath:", fullpath)
- if strings.HasPrefix(fullpath, "/.meta") {
+ if strings.HasPrefix(fullpath, SystemLogDir) {
return
}
@@ -69,11 +69,10 @@ func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventN
func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
- return
-
- targetFile := fmt.Sprintf("/.meta/log/%04d/%02d/%02d/%02d/%02d/%02d.%09d.log",
+ targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- startTime.Second(), startTime.Nanosecond())
+ // startTime.Second(), startTime.Nanosecond(),
+ )
if err := f.appendToFile(targetFile, buf); err != nil {
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go
index 4c134ae66..0e6e8d50f 100644
--- a/weed/filer2/filer_notify_append.go
+++ b/weed/filer2/filer_notify_append.go
@@ -13,25 +13,10 @@ import (
func (f *Filer) appendToFile(targetFile string, data []byte) error {
- // assign a volume location
- assignRequest := &operation.VolumeAssignRequest{
- Count: 1,
- }
- assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
- if err != nil {
- return fmt.Errorf("AssignVolume: %v", err)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("AssignVolume error: %v", assignResult.Error)
- }
-
- // upload data
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth)
- if err != nil {
- return fmt.Errorf("upload data %s: %v", targetUrl, err)
+ assignResult, err, uploadResult, err2 := f.assignAndUpload(data)
+ if err2 != nil {
+ return err2
}
- // println("uploaded to", targetUrl)
// find out existing entry
fullpath := util.FullPath(targetFile)
@@ -68,3 +53,29 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
return err
}
+
+func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, error, *operation.UploadResult, error) {
+ // assign a volume location
+ assignRequest := &operation.VolumeAssignRequest{
+ Count: 1,
+ Collection: f.metaLogCollection,
+ Replication: f.metaLogReplication,
+ WritableVolumeCount: 1,
+ }
+ assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("AssignVolume: %v", err)
+ }
+ if assignResult.Error != "" {
+ return nil, nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
+ }
+
+ // upload data
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, err, uploadResult, nil
+}
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 21d126322..4f415bb9c 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -11,7 +11,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index 324b07d6c..d4ab2c163 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -11,7 +11,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
@@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go
new file mode 100644
index 000000000..8ab409774
--- /dev/null
+++ b/weed/filer2/topics.go
@@ -0,0 +1,6 @@
+package filer2
+
+const(
+ TopicsDir = "/topics"
+ SystemLogDir = TopicsDir + "/.system/log"
+)
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
index 9c98f74e5..e2d932a99 100644
--- a/weed/server/filer_grpc_server_listen.go
+++ b/weed/server/filer_grpc_server_listen.go
@@ -4,6 +4,7 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -37,7 +38,7 @@ func (fs *FilerServer) ListenForEvents(req *filer_pb.ListenForEventsRequest, str
fullpath := util.Join(dirPath, entryName)
// skip on filer internal meta logs
- if strings.HasPrefix(fullpath, "/.meta") {
+ if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
return nil
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index d8761a538..96a5545f4 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -73,7 +73,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, fs.notifyMetaListeners)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
fs.filer.Cipher = option.Cipher
maybeStartMetrics(fs, option)