1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
package weed_server
import (
"math/rand"
"net/http"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
//"net/http/pprof"
)
type VolumeServer struct {
masterNode string
mnLock sync.RWMutex
pulseSeconds int
dataCenter string
rack string
store *storage.Store
read_guard *security.Guard
write_guard *security.Guard
needleMapKind storage.NeedleMapType
FixJpgOrientation bool
ReadRedirect bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
folders []string, maxCounts []int,
needleMapKind storage.NeedleMapType,
masterNode string, pulseSeconds int,
dataCenter string, rack string,
ipReadWhiteList []string,
ipWriteWhiteList []string,
rootWhiteList []string,
fixJpgOrientation bool,
readRedirect bool) *VolumeServer {
vs := &VolumeServer{
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
}
vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
vs.read_guard = security.NewGuard(ipReadWhiteList, rootWhiteList, "")
vs.write_guard = security.NewGuard(ipWriteWhiteList, rootWhiteList, "")
adminMux.HandleFunc("/ui/index.html", vs.read_guard.WhiteList(vs.uiStatusHandler))
adminMux.HandleFunc("/status", vs.read_guard.WhiteList(vs.statusHandler))
adminMux.HandleFunc("/admin/assign_volume", vs.write_guard.WhiteList(vs.assignVolumeHandler))
adminMux.HandleFunc("/admin/vacuum/check", vs.write_guard.WhiteList(vs.vacuumVolumeCheckHandler))
adminMux.HandleFunc("/admin/vacuum/compact", vs.write_guard.WhiteList(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum/commit", vs.write_guard.WhiteList(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.write_guard.WhiteList(vs.deleteCollectionHandler))
adminMux.HandleFunc("/admin/sync/status", vs.read_guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.write_guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.write_guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/stats/counter", vs.read_guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.read_guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.read_guard.WhiteList(vs.statsDiskHandler))
adminMux.HandleFunc("/delete", vs.write_guard.WhiteList(vs.batchDeleteHandler))
adminMux.HandleFunc("/", vs.read_guard.WhiteList(vs.privateStoreHandler))
if publicMux != adminMux {
// separated admin and public port
publicMux.HandleFunc("/favicon.ico", vs.faviconHandler)
publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
}
/*
// add in profiling support
adminMux.HandleFunc("/debug/pprof/", pprof.Index)
adminMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
adminMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
adminMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
adminMux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
adminMux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
adminMux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
adminMux.Handle("/debug/pprof/block", pprof.Handler("block"))
*/
go func() {
connected := true
glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode())
vs.store.SetBootstrapMaster(vs.GetMasterNode())
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
for {
glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode())
master, secretKey, err := vs.store.SendHeartbeatToMaster()
if err == nil {
if !connected {
connected = true
vs.SetMasterNode(master)
vs.read_guard.SecretKey = secretKey
vs.write_guard.SecretKey = secretKey
glog.V(0).Infoln("Volume Server Connected with master at", master)
}
} else {
glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err)
if connected {
connected = false
}
}
if connected {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
} else {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond)
}
}
}()
return vs
}
func (vs *VolumeServer) GetMasterNode() string {
vs.mnLock.RLock()
defer vs.mnLock.RUnlock()
return vs.masterNode
}
func (vs *VolumeServer) SetMasterNode(masterNode string) {
vs.mnLock.Lock()
defer vs.mnLock.Unlock()
vs.masterNode = masterNode
}
func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close()
glog.V(0).Infoln("Shut down successfully!")
}
func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(vs.read_guard.SecretKey, fileId)
}
|