diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filer.go | 11 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 16 | ||||
| -rw-r--r-- | weed/filer/remote_mapping.go | 2 | ||||
| -rw-r--r-- | weed/filer/remote_storage.go | 4 |
4 files changed, 17 insertions, 16 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 1a20abefc..f13782031 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "os" "strings" "time" @@ -45,10 +46,10 @@ type Filer struct { RemoteStorage *FilerRemoteStorage } -func NewFiler(masters []string, grpcDialOption grpc.DialOption, - filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer { +func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, + filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ - MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), @@ -63,7 +64,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, return f } -func (f *Filer) AggregateFromPeers(self string, filers []string) { +func (f *Filer) AggregateFromPeers(self pb.ServerAddress, filers []pb.ServerAddress) { // set peers found := false @@ -110,7 +111,7 @@ func (f *Filer) GetStore() (store FilerStore) { return f.Store } -func (fs *Filer) GetMaster() string { +func (fs *Filer) GetMaster() pb.ServerAddress { return fs.MasterClient.GetMaster() } diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 913cbd454..008fd33a7 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -18,7 +18,7 @@ import ( ) type MetaAggregator struct { - filers []string + filers []pb.ServerAddress grpcDialOption grpc.DialOption MetaLogBuffer *log_buffer.LogBuffer // notifying clients @@ -28,7 +28,7 @@ type MetaAggregator struct { // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. // The old data comes from what each LocalMetadata persisted on disk. -func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator { +func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator { t := &MetaAggregator{ filers: filers, grpcDialOption: grpcDialOption, @@ -40,13 +40,13 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg return t } -func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { +func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self pb.ServerAddress) { for _, filer := range ma.filers { go ma.subscribeToOneFiler(f, self, filer) } } -func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) { +func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { /* Each filer reads the "filer.store.id", which is the store's signature when filer starts. @@ -123,7 +123,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer:" + self, + ClientName: "filer:" + string(self), PathPrefix: "/", SinceNs: lastTsNs, }) @@ -156,7 +156,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string } } -func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) { +func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) { err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { @@ -172,7 +172,7 @@ const ( MetaOffsetPrefix = "Meta" ) -func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) { +func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) { key := []byte(MetaOffsetPrefix + "xxxx") util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) @@ -195,7 +195,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) return } -func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) { +func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) { key := []byte(MetaOffsetPrefix + "xxxx") util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go index fb74dca98..c95c4e5bd 100644 --- a/weed/filer/remote_mapping.go +++ b/weed/filer/remote_mapping.go @@ -9,7 +9,7 @@ import ( "google.golang.org/grpc" ) -func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *remote_pb.RemoteStorageMapping, readErr error) { +func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) { var oldContent []byte if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go index 4ff21f3b3..9d682b698 100644 --- a/weed/filer/remote_storage.go +++ b/weed/filer/remote_storage.go @@ -131,7 +131,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo return } -func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *remote_pb.RemoteConf, readErr error) { +func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) { var oldContent []byte if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) @@ -150,7 +150,7 @@ func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, return } -func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress string, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) { +func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) { mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress) if listErr != nil { |
