diff options
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/bytes.go | 45 | ||||
| -rw-r--r-- | weed/util/bytes_pool.go | 127 | ||||
| -rw-r--r-- | weed/util/bytes_pool_test.go | 41 | ||||
| -rw-r--r-- | weed/util/concurrent_read_map.go | 60 | ||||
| -rw-r--r-- | weed/util/config.go | 130 | ||||
| -rw-r--r-- | weed/util/constants.go | 5 | ||||
| -rw-r--r-- | weed/util/file_util.go | 38 | ||||
| -rw-r--r-- | weed/util/http_util.go | 163 | ||||
| -rw-r--r-- | weed/util/net_timeout.go | 81 | ||||
| -rw-r--r-- | weed/util/parse.go | 26 |
10 files changed, 716 insertions, 0 deletions
diff --git a/weed/util/bytes.go b/weed/util/bytes.go new file mode 100644 index 000000000..dfa4ae665 --- /dev/null +++ b/weed/util/bytes.go @@ -0,0 +1,45 @@ +package util + +// big endian + +func BytesToUint64(b []byte) (v uint64) { + length := uint(len(b)) + for i := uint(0); i < length-1; i++ { + v += uint64(b[i]) + v <<= 8 + } + v += uint64(b[length-1]) + return +} +func BytesToUint32(b []byte) (v uint32) { + length := uint(len(b)) + for i := uint(0); i < length-1; i++ { + v += uint32(b[i]) + v <<= 8 + } + v += uint32(b[length-1]) + return +} +func BytesToUint16(b []byte) (v uint16) { + v += uint16(b[0]) + v <<= 8 + v += uint16(b[1]) + return +} +func Uint64toBytes(b []byte, v uint64) { + for i := uint(0); i < 8; i++ { + b[7-i] = byte(v >> (i * 8)) + } +} +func Uint32toBytes(b []byte, v uint32) { + for i := uint(0); i < 4; i++ { + b[3-i] = byte(v >> (i * 8)) + } +} +func Uint16toBytes(b []byte, v uint16) { + b[0] = byte(v >> 8) + b[1] = byte(v) +} +func Uint8toBytes(b []byte, v uint8) { + b[0] = byte(v) +} diff --git a/weed/util/bytes_pool.go b/weed/util/bytes_pool.go new file mode 100644 index 000000000..58ed6feca --- /dev/null +++ b/weed/util/bytes_pool.go @@ -0,0 +1,127 @@ +package util + +import ( + "bytes" + "fmt" + "sync" + "sync/atomic" + "time" +) + +var ( + ChunkSizes = []int{ + 1 << 4, // index 0, 16 bytes, inclusive + 1 << 6, // index 1, 64 bytes + 1 << 8, // index 2, 256 bytes + 1 << 10, // index 3, 1K bytes + 1 << 12, // index 4, 4K bytes + 1 << 14, // index 5, 16K bytes + 1 << 16, // index 6, 64K bytes + 1 << 18, // index 7, 256K bytes + 1 << 20, // index 8, 1M bytes + 1 << 22, // index 9, 4M bytes + 1 << 24, // index 10, 16M bytes + 1 << 26, // index 11, 64M bytes + 1 << 28, // index 12, 128M bytes + } + + _DEBUG = false +) + +type BytesPool struct { + chunkPools []*byteChunkPool +} + +func NewBytesPool() *BytesPool { + var bp BytesPool + for _, size := range ChunkSizes { + bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size)) + } + ret := &bp + if _DEBUG { + t := time.NewTicker(10 * time.Second) + go func() { + for { + println("buffer:", ret.String()) + <-t.C + } + }() + } + return ret +} + +func (m *BytesPool) String() string { + var buf bytes.Buffer + for index, size := range ChunkSizes { + if m.chunkPools[index].count > 0 { + buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count)) + } + } + return buf.String() +} + +func findChunkPoolIndex(size int) int { + if size <= 0 { + return -1 + } + size = (size - 1) >> 4 + ret := 0 + for size > 0 { + size = size >> 2 + ret = ret + 1 + } + if ret >= len(ChunkSizes) { + return -1 + } + return ret +} + +func (m *BytesPool) Get(size int) []byte { + index := findChunkPoolIndex(size) + // println("get index:", index) + if index < 0 { + return make([]byte, size) + } + return m.chunkPools[index].Get() +} + +func (m *BytesPool) Put(b []byte) { + index := findChunkPoolIndex(len(b)) + // println("put index:", index) + if index < 0 { + return + } + m.chunkPools[index].Put(b) +} + +// a pool of fix-sized []byte chunks. The pool size is managed by Go GC +type byteChunkPool struct { + sync.Pool + chunkSizeLimit int + count int64 +} + +var count int + +func newByteChunkPool(chunkSizeLimit int) *byteChunkPool { + var m byteChunkPool + m.chunkSizeLimit = chunkSizeLimit + m.Pool.New = func() interface{} { + count++ + // println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count) + return make([]byte, m.chunkSizeLimit) + } + return &m +} + +func (m *byteChunkPool) Get() []byte { + // println("before get size:", m.chunkSizeLimit, "count:", m.count) + atomic.AddInt64(&m.count, 1) + return m.Pool.Get().([]byte) +} + +func (m *byteChunkPool) Put(b []byte) { + atomic.AddInt64(&m.count, -1) + // println("after put get size:", m.chunkSizeLimit, "count:", m.count) + m.Pool.Put(b) +} diff --git a/weed/util/bytes_pool_test.go b/weed/util/bytes_pool_test.go new file mode 100644 index 000000000..3f37c16cf --- /dev/null +++ b/weed/util/bytes_pool_test.go @@ -0,0 +1,41 @@ +package util + +import ( + "testing" +) + +func TestTTLReadWrite(t *testing.T) { + var tests = []struct { + n int // input + expected int // expected result + }{ + {0, -1}, + {1, 0}, + {1 << 4, 0}, + {1 << 6, 1}, + {1 << 8, 2}, + {1 << 10, 3}, + {1 << 12, 4}, + {1 << 14, 5}, + {1 << 16, 6}, + {1 << 18, 7}, + {1<<4 + 1, 1}, + {1<<6 + 1, 2}, + {1<<8 + 1, 3}, + {1<<10 + 1, 4}, + {1<<12 + 1, 5}, + {1<<14 + 1, 6}, + {1<<16 + 1, 7}, + {1<<18 + 1, 8}, + {1<<28 - 1, 12}, + {1 << 28, 12}, + {1<<28 + 2134, -1}, + {1080, 4}, + } + for _, tt := range tests { + actual := findChunkPoolIndex(tt.n) + if actual != tt.expected { + t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual) + } + } +} diff --git a/weed/util/concurrent_read_map.go b/weed/util/concurrent_read_map.go new file mode 100644 index 000000000..28b6ae0f1 --- /dev/null +++ b/weed/util/concurrent_read_map.go @@ -0,0 +1,60 @@ +package util + +import ( + "sync" +) + +// A mostly for read map, which can thread-safely +// initialize the map entries. +type ConcurrentReadMap struct { + sync.RWMutex + + items map[string]interface{} +} + +func NewConcurrentReadMap() *ConcurrentReadMap { + return &ConcurrentReadMap{items: make(map[string]interface{})} +} + +func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { + m.Lock() + defer m.Unlock() + if value, ok := m.items[key]; ok { + return value + } + value = newEntry() + m.items[key] = value + return value +} + +func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { + m.RLock() + if value, ok := m.items[key]; ok { + m.RUnlock() + return value + } + m.RUnlock() + return m.initMapEntry(key, newEntry) +} + +func (m *ConcurrentReadMap) Find(key string) (interface{}, bool) { + m.RLock() + value, ok := m.items[key] + m.RUnlock() + return value, ok +} + +func (m *ConcurrentReadMap) Items() (itemsCopy []interface{}) { + m.RLock() + for _, i := range m.items { + itemsCopy = append(itemsCopy, i) + } + m.RUnlock() + return itemsCopy +} + +func (m *ConcurrentReadMap) Delete(key string) { + m.Lock() + delete(m.items, key) + m.Unlock() +} diff --git a/weed/util/config.go b/weed/util/config.go new file mode 100644 index 000000000..e4549c322 --- /dev/null +++ b/weed/util/config.go @@ -0,0 +1,130 @@ +package util + +// Copyright 2011 Numerotron Inc. +// Use of this source code is governed by an MIT-style license +// that can be found in the LICENSE file. +// +// Developed at www.stathat.com by Patrick Crosby +// Contact us on twitter with any questions: twitter.com/stat_hat + +// The jconfig package provides a simple, basic configuration file parser using JSON. + +import ( + "bytes" + "encoding/json" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type Config struct { + data map[string]interface{} + filename string +} + +func newConfig() *Config { + result := new(Config) + result.data = make(map[string]interface{}) + return result +} + +// Loads config information from a JSON file +func LoadConfig(filename string) *Config { + result := newConfig() + result.filename = filename + err := result.parse() + if err != nil { + glog.Fatalf("error loading config file %s: %s", filename, err) + } + return result +} + +// Loads config information from a JSON string +func LoadConfigString(s string) *Config { + result := newConfig() + err := json.Unmarshal([]byte(s), &result.data) + if err != nil { + glog.Fatalf("error parsing config string %s: %s", s, err) + } + return result +} + +func (c *Config) StringMerge(s string) { + next := LoadConfigString(s) + c.merge(next.data) +} + +func (c *Config) LoadMerge(filename string) { + next := LoadConfig(filename) + c.merge(next.data) +} + +func (c *Config) merge(ndata map[string]interface{}) { + for k, v := range ndata { + c.data[k] = v + } +} + +func (c *Config) parse() error { + f, err := os.Open(c.filename) + if err != nil { + return err + } + defer f.Close() + b := new(bytes.Buffer) + _, err = b.ReadFrom(f) + if err != nil { + return err + } + err = json.Unmarshal(b.Bytes(), &c.data) + if err != nil { + return err + } + + return nil +} + +// Returns a string for the config variable key +func (c *Config) GetString(key string) string { + result, present := c.data[key] + if !present { + return "" + } + return result.(string) +} + +// Returns an int for the config variable key +func (c *Config) GetInt(key string) int { + x, ok := c.data[key] + if !ok { + return -1 + } + return int(x.(float64)) +} + +// Returns a float for the config variable key +func (c *Config) GetFloat(key string) float64 { + x, ok := c.data[key] + if !ok { + return -1 + } + return x.(float64) +} + +// Returns a bool for the config variable key +func (c *Config) GetBool(key string) bool { + x, ok := c.data[key] + if !ok { + return false + } + return x.(bool) +} + +// Returns an array for the config variable key +func (c *Config) GetArray(key string) []interface{} { + result, present := c.data[key] + if !present { + return []interface{}(nil) + } + return result.([]interface{}) +} diff --git a/weed/util/constants.go b/weed/util/constants.go new file mode 100644 index 000000000..6b6b0b911 --- /dev/null +++ b/weed/util/constants.go @@ -0,0 +1,5 @@ +package util + +const ( + VERSION = "0.71 beta" +) diff --git a/weed/util/file_util.go b/weed/util/file_util.go new file mode 100644 index 000000000..a39fb0860 --- /dev/null +++ b/weed/util/file_util.go @@ -0,0 +1,38 @@ +package util + +import ( + "bufio" + "errors" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func TestFolderWritable(folder string) (err error) { + fileInfo, err := os.Stat(folder) + if err != nil { + return err + } + if !fileInfo.IsDir() { + return errors.New("Not a valid folder!") + } + perm := fileInfo.Mode().Perm() + glog.V(0).Infoln("Folder", folder, "Permission:", perm) + if 0200&perm != 0 { + return nil + } + return errors.New("Not writable!") +} + +func Readln(r *bufio.Reader) ([]byte, error) { + var ( + isPrefix = true + err error + line, ln []byte + ) + for isPrefix && err == nil { + line, isPrefix, err = r.ReadLine() + ln = append(ln, line...) + } + return ln, err +} diff --git a/weed/util/http_util.go b/weed/util/http_util.go new file mode 100644 index 000000000..a54fc8779 --- /dev/null +++ b/weed/util/http_util.go @@ -0,0 +1,163 @@ +package util + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/chrislusf/seaweedfs/weed/security" +) + +var ( + client *http.Client + Transport *http.Transport +) + +func init() { + Transport = &http.Transport{ + MaxIdleConnsPerHost: 1024, + } + client = &http.Client{Transport: Transport} +} + +func PostBytes(url string, body []byte) ([]byte, error) { + r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("Post to %s: %v", url, err) + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, fmt.Errorf("Read response body: %v", err) + } + return b, nil +} + +func Post(url string, values url.Values) ([]byte, error) { + r, err := client.PostForm(url, values) + if err != nil { + return nil, err + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + return b, nil +} + +func Get(url string) ([]byte, error) { + r, err := client.Get(url) + if err != nil { + return nil, err + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if r.StatusCode != 200 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } + if err != nil { + return nil, err + } + return b, nil +} + +func Delete(url string, jwt security.EncodedJwt) error { + req, err := http.NewRequest("DELETE", url, nil) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + if err != nil { + return err + } + resp, e := client.Do(req) + if e != nil { + return e + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + switch resp.StatusCode { + case http.StatusNotFound, http.StatusAccepted, http.StatusOK: + return nil + } + m := make(map[string]interface{}) + if e := json.Unmarshal(body, m); e == nil { + if s, ok := m["error"].(string); ok { + return errors.New(s) + } + } + return errors.New(string(body)) +} + +func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { + r, err := client.PostForm(url, values) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != 200 { + return fmt.Errorf("%s: %s", url, r.Status) + } + bufferSize := len(allocatedBytes) + for { + n, err := r.Body.Read(allocatedBytes) + if n == bufferSize { + eachBuffer(allocatedBytes) + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } + return nil +} + +func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error { + r, err := client.PostForm(url, values) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != 200 { + return fmt.Errorf("%s: %s", url, r.Status) + } + return readFn(r.Body) +} + +func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { + response, err := client.Get(fileUrl) + if err != nil { + return "", nil, err + } + contentDisposition := response.Header["Content-Disposition"] + if len(contentDisposition) > 0 { + if strings.HasPrefix(contentDisposition[0], "filename=") { + filename = contentDisposition[0][len("filename="):] + filename = strings.Trim(filename, "\"") + } + } + rc = response.Body + return +} + +func Do(req *http.Request) (resp *http.Response, err error) { + return client.Do(req) +} + +func NormalizeUrl(url string) string { + if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") { + return url + } + return "http://" + url +} diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go new file mode 100644 index 000000000..f46776992 --- /dev/null +++ b/weed/util/net_timeout.go @@ -0,0 +1,81 @@ +package util + +import ( + "net" + "time" + + "github.com/chrislusf/seaweedfs/weed/stats" +) + +// Listener wraps a net.Listener, and gives a place to store the timeout +// parameters. On Accept, it will wrap the net.Conn with our own Conn for us. +type Listener struct { + net.Listener + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (l *Listener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + stats.ConnectionOpen() + tc := &Conn{ + Conn: c, + ReadTimeout: l.ReadTimeout, + WriteTimeout: l.WriteTimeout, + } + return tc, nil +} + +// Conn wraps a net.Conn, and sets a deadline for every read +// and write operation. +type Conn struct { + net.Conn + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (c *Conn) Read(b []byte) (count int, e error) { + err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) + if err != nil { + return 0, err + } + count, e = c.Conn.Read(b) + if e == nil { + stats.BytesIn(int64(count)) + } + return +} + +func (c *Conn) Write(b []byte) (count int, e error) { + err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + if err != nil { + return 0, err + } + count, e = c.Conn.Write(b) + if e == nil { + stats.BytesOut(int64(count)) + } + return +} + +func (c *Conn) Close() error { + stats.ConnectionClose() + return c.Conn.Close() +} + +func NewListener(addr string, timeout time.Duration) (net.Listener, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + tl := &Listener{ + Listener: l, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + return tl, nil +} diff --git a/weed/util/parse.go b/weed/util/parse.go new file mode 100644 index 000000000..0a8317c19 --- /dev/null +++ b/weed/util/parse.go @@ -0,0 +1,26 @@ +package util + +import ( + "strconv" +) + +func ParseInt(text string, defaultValue int) int { + count, parseError := strconv.ParseInt(text, 10, 64) + if parseError != nil { + if len(text) > 0 { + return 0 + } + return defaultValue + } + return int(count) +} +func ParseUint64(text string, defaultValue uint64) uint64 { + count, parseError := strconv.ParseUint(text, 10, 64) + if parseError != nil { + if len(text) > 0 { + return 0 + } + return defaultValue + } + return count +} |
