aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go3
-rw-r--r--weed/server/filer_server.go9
-rw-r--r--weed/server/queue_server.go49
3 files changed, 56 insertions, 5 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 30a5cc9de..35539acca 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -336,6 +336,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
- DirBuckets: fs.option.DirBucketsPath,
+ DirBuckets: fs.filer.DirBucketsPath,
+ DirQueues: fs.filer.DirQueuesPath,
}, nil
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 5fc038e17..bfb182dbe 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -46,7 +46,6 @@ type FilerOption struct {
DisableHttp bool
Port int
recursiveDelete bool
- DirBucketsPath string
}
type FilerServer struct {
@@ -67,7 +66,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.DirBucketsPath)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
go fs.filer.KeepConnectedToMaster()
@@ -84,7 +83,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
v.Set("filer.option.buckets_folder", "/buckets")
- fs.option.DirBucketsPath = v.GetString("filer.option.buckets_folder")
+ v.Set("filer.option.queues_folder", "/queues")
+ fs.filer.DirBucketsPath = v.GetString("filer.option.buckets_folder")
+ fs.filer.DirQueuesPath = v.GetString("filer.option.queues_folder")
fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
@@ -97,7 +98,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- fs.filer.LoadBuckets(fs.option.DirBucketsPath)
+ fs.filer.LoadBuckets(fs.filer.DirBucketsPath)
maybeStartMetrics(fs, option)
diff --git a/weed/server/queue_server.go b/weed/server/queue_server.go
new file mode 100644
index 000000000..078c76a30
--- /dev/null
+++ b/weed/server/queue_server.go
@@ -0,0 +1,49 @@
+package weed_server
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type QueueServerOption struct {
+ Filers []string
+ DefaultReplication string
+ MaxMB int
+ Port int
+}
+
+type QueueServer struct {
+ option *QueueServerOption
+ grpcDialOption grpc.DialOption
+}
+
+func (q *QueueServer) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) {
+ panic("implement me")
+}
+
+func (q *QueueServer) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) {
+ panic("implement me")
+}
+
+func (q *QueueServer) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error {
+ panic("implement me")
+}
+
+func (q *QueueServer) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error {
+ panic("implement me")
+}
+
+func NewQueueServer(option *QueueServerOption) (qs *QueueServer, err error) {
+
+ qs = &QueueServer{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.queue"),
+ }
+
+ return qs, nil
+}