diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 3 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 9 | ||||
| -rw-r--r-- | weed/server/queue_server.go | 49 |
3 files changed, 56 insertions, 5 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 30a5cc9de..35539acca 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -336,6 +336,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), - DirBuckets: fs.option.DirBucketsPath, + DirBuckets: fs.filer.DirBucketsPath, + DirQueues: fs.filer.DirQueuesPath, }, nil } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 5fc038e17..bfb182dbe 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -46,7 +46,6 @@ type FilerOption struct { DisableHttp bool Port int recursiveDelete bool - DirBucketsPath string } type FilerServer struct { @@ -67,7 +66,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.DirBucketsPath) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption) go fs.filer.KeepConnectedToMaster() @@ -84,7 +83,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") v.Set("filer.option.buckets_folder", "/buckets") - fs.option.DirBucketsPath = v.GetString("filer.option.buckets_folder") + v.Set("filer.option.queues_folder", "/queues") + fs.filer.DirBucketsPath = v.GetString("filer.option.buckets_folder") + fs.filer.DirQueuesPath = v.GetString("filer.option.queues_folder") fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v, "notification.") @@ -97,7 +98,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - fs.filer.LoadBuckets(fs.option.DirBucketsPath) + fs.filer.LoadBuckets(fs.filer.DirBucketsPath) maybeStartMetrics(fs, option) diff --git a/weed/server/queue_server.go b/weed/server/queue_server.go new file mode 100644 index 000000000..078c76a30 --- /dev/null +++ b/weed/server/queue_server.go @@ -0,0 +1,49 @@ +package weed_server + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type QueueServerOption struct { + Filers []string + DefaultReplication string + MaxMB int + Port int +} + +type QueueServer struct { + option *QueueServerOption + grpcDialOption grpc.DialOption +} + +func (q *QueueServer) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) { + panic("implement me") +} + +func (q *QueueServer) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) { + panic("implement me") +} + +func (q *QueueServer) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error { + panic("implement me") +} + +func (q *QueueServer) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error { + panic("implement me") +} + +func NewQueueServer(option *QueueServerOption) (qs *QueueServer, err error) { + + qs = &QueueServer{ + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.queue"), + } + + return qs, nil +} |
