aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/bytes.go45
-rw-r--r--weed/util/bytes_pool.go127
-rw-r--r--weed/util/bytes_pool_test.go41
-rw-r--r--weed/util/concurrent_read_map.go60
-rw-r--r--weed/util/config.go130
-rw-r--r--weed/util/constants.go5
-rw-r--r--weed/util/file_util.go38
-rw-r--r--weed/util/http_util.go163
-rw-r--r--weed/util/net_timeout.go81
-rw-r--r--weed/util/parse.go26
10 files changed, 716 insertions, 0 deletions
diff --git a/weed/util/bytes.go b/weed/util/bytes.go
new file mode 100644
index 000000000..dfa4ae665
--- /dev/null
+++ b/weed/util/bytes.go
@@ -0,0 +1,45 @@
+package util
+
+// big endian
+
+func BytesToUint64(b []byte) (v uint64) {
+ length := uint(len(b))
+ for i := uint(0); i < length-1; i++ {
+ v += uint64(b[i])
+ v <<= 8
+ }
+ v += uint64(b[length-1])
+ return
+}
+func BytesToUint32(b []byte) (v uint32) {
+ length := uint(len(b))
+ for i := uint(0); i < length-1; i++ {
+ v += uint32(b[i])
+ v <<= 8
+ }
+ v += uint32(b[length-1])
+ return
+}
+func BytesToUint16(b []byte) (v uint16) {
+ v += uint16(b[0])
+ v <<= 8
+ v += uint16(b[1])
+ return
+}
+func Uint64toBytes(b []byte, v uint64) {
+ for i := uint(0); i < 8; i++ {
+ b[7-i] = byte(v >> (i * 8))
+ }
+}
+func Uint32toBytes(b []byte, v uint32) {
+ for i := uint(0); i < 4; i++ {
+ b[3-i] = byte(v >> (i * 8))
+ }
+}
+func Uint16toBytes(b []byte, v uint16) {
+ b[0] = byte(v >> 8)
+ b[1] = byte(v)
+}
+func Uint8toBytes(b []byte, v uint8) {
+ b[0] = byte(v)
+}
diff --git a/weed/util/bytes_pool.go b/weed/util/bytes_pool.go
new file mode 100644
index 000000000..58ed6feca
--- /dev/null
+++ b/weed/util/bytes_pool.go
@@ -0,0 +1,127 @@
+package util
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+var (
+ ChunkSizes = []int{
+ 1 << 4, // index 0, 16 bytes, inclusive
+ 1 << 6, // index 1, 64 bytes
+ 1 << 8, // index 2, 256 bytes
+ 1 << 10, // index 3, 1K bytes
+ 1 << 12, // index 4, 4K bytes
+ 1 << 14, // index 5, 16K bytes
+ 1 << 16, // index 6, 64K bytes
+ 1 << 18, // index 7, 256K bytes
+ 1 << 20, // index 8, 1M bytes
+ 1 << 22, // index 9, 4M bytes
+ 1 << 24, // index 10, 16M bytes
+ 1 << 26, // index 11, 64M bytes
+ 1 << 28, // index 12, 128M bytes
+ }
+
+ _DEBUG = false
+)
+
+type BytesPool struct {
+ chunkPools []*byteChunkPool
+}
+
+func NewBytesPool() *BytesPool {
+ var bp BytesPool
+ for _, size := range ChunkSizes {
+ bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size))
+ }
+ ret := &bp
+ if _DEBUG {
+ t := time.NewTicker(10 * time.Second)
+ go func() {
+ for {
+ println("buffer:", ret.String())
+ <-t.C
+ }
+ }()
+ }
+ return ret
+}
+
+func (m *BytesPool) String() string {
+ var buf bytes.Buffer
+ for index, size := range ChunkSizes {
+ if m.chunkPools[index].count > 0 {
+ buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count))
+ }
+ }
+ return buf.String()
+}
+
+func findChunkPoolIndex(size int) int {
+ if size <= 0 {
+ return -1
+ }
+ size = (size - 1) >> 4
+ ret := 0
+ for size > 0 {
+ size = size >> 2
+ ret = ret + 1
+ }
+ if ret >= len(ChunkSizes) {
+ return -1
+ }
+ return ret
+}
+
+func (m *BytesPool) Get(size int) []byte {
+ index := findChunkPoolIndex(size)
+ // println("get index:", index)
+ if index < 0 {
+ return make([]byte, size)
+ }
+ return m.chunkPools[index].Get()
+}
+
+func (m *BytesPool) Put(b []byte) {
+ index := findChunkPoolIndex(len(b))
+ // println("put index:", index)
+ if index < 0 {
+ return
+ }
+ m.chunkPools[index].Put(b)
+}
+
+// a pool of fix-sized []byte chunks. The pool size is managed by Go GC
+type byteChunkPool struct {
+ sync.Pool
+ chunkSizeLimit int
+ count int64
+}
+
+var count int
+
+func newByteChunkPool(chunkSizeLimit int) *byteChunkPool {
+ var m byteChunkPool
+ m.chunkSizeLimit = chunkSizeLimit
+ m.Pool.New = func() interface{} {
+ count++
+ // println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count)
+ return make([]byte, m.chunkSizeLimit)
+ }
+ return &m
+}
+
+func (m *byteChunkPool) Get() []byte {
+ // println("before get size:", m.chunkSizeLimit, "count:", m.count)
+ atomic.AddInt64(&m.count, 1)
+ return m.Pool.Get().([]byte)
+}
+
+func (m *byteChunkPool) Put(b []byte) {
+ atomic.AddInt64(&m.count, -1)
+ // println("after put get size:", m.chunkSizeLimit, "count:", m.count)
+ m.Pool.Put(b)
+}
diff --git a/weed/util/bytes_pool_test.go b/weed/util/bytes_pool_test.go
new file mode 100644
index 000000000..3f37c16cf
--- /dev/null
+++ b/weed/util/bytes_pool_test.go
@@ -0,0 +1,41 @@
+package util
+
+import (
+ "testing"
+)
+
+func TestTTLReadWrite(t *testing.T) {
+ var tests = []struct {
+ n int // input
+ expected int // expected result
+ }{
+ {0, -1},
+ {1, 0},
+ {1 << 4, 0},
+ {1 << 6, 1},
+ {1 << 8, 2},
+ {1 << 10, 3},
+ {1 << 12, 4},
+ {1 << 14, 5},
+ {1 << 16, 6},
+ {1 << 18, 7},
+ {1<<4 + 1, 1},
+ {1<<6 + 1, 2},
+ {1<<8 + 1, 3},
+ {1<<10 + 1, 4},
+ {1<<12 + 1, 5},
+ {1<<14 + 1, 6},
+ {1<<16 + 1, 7},
+ {1<<18 + 1, 8},
+ {1<<28 - 1, 12},
+ {1 << 28, 12},
+ {1<<28 + 2134, -1},
+ {1080, 4},
+ }
+ for _, tt := range tests {
+ actual := findChunkPoolIndex(tt.n)
+ if actual != tt.expected {
+ t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual)
+ }
+ }
+}
diff --git a/weed/util/concurrent_read_map.go b/weed/util/concurrent_read_map.go
new file mode 100644
index 000000000..28b6ae0f1
--- /dev/null
+++ b/weed/util/concurrent_read_map.go
@@ -0,0 +1,60 @@
+package util
+
+import (
+ "sync"
+)
+
+// A mostly for read map, which can thread-safely
+// initialize the map entries.
+type ConcurrentReadMap struct {
+ sync.RWMutex
+
+ items map[string]interface{}
+}
+
+func NewConcurrentReadMap() *ConcurrentReadMap {
+ return &ConcurrentReadMap{items: make(map[string]interface{})}
+}
+
+func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
+ m.Lock()
+ defer m.Unlock()
+ if value, ok := m.items[key]; ok {
+ return value
+ }
+ value = newEntry()
+ m.items[key] = value
+ return value
+}
+
+func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
+ m.RLock()
+ if value, ok := m.items[key]; ok {
+ m.RUnlock()
+ return value
+ }
+ m.RUnlock()
+ return m.initMapEntry(key, newEntry)
+}
+
+func (m *ConcurrentReadMap) Find(key string) (interface{}, bool) {
+ m.RLock()
+ value, ok := m.items[key]
+ m.RUnlock()
+ return value, ok
+}
+
+func (m *ConcurrentReadMap) Items() (itemsCopy []interface{}) {
+ m.RLock()
+ for _, i := range m.items {
+ itemsCopy = append(itemsCopy, i)
+ }
+ m.RUnlock()
+ return itemsCopy
+}
+
+func (m *ConcurrentReadMap) Delete(key string) {
+ m.Lock()
+ delete(m.items, key)
+ m.Unlock()
+}
diff --git a/weed/util/config.go b/weed/util/config.go
new file mode 100644
index 000000000..e4549c322
--- /dev/null
+++ b/weed/util/config.go
@@ -0,0 +1,130 @@
+package util
+
+// Copyright 2011 Numerotron Inc.
+// Use of this source code is governed by an MIT-style license
+// that can be found in the LICENSE file.
+//
+// Developed at www.stathat.com by Patrick Crosby
+// Contact us on twitter with any questions: twitter.com/stat_hat
+
+// The jconfig package provides a simple, basic configuration file parser using JSON.
+
+import (
+ "bytes"
+ "encoding/json"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type Config struct {
+ data map[string]interface{}
+ filename string
+}
+
+func newConfig() *Config {
+ result := new(Config)
+ result.data = make(map[string]interface{})
+ return result
+}
+
+// Loads config information from a JSON file
+func LoadConfig(filename string) *Config {
+ result := newConfig()
+ result.filename = filename
+ err := result.parse()
+ if err != nil {
+ glog.Fatalf("error loading config file %s: %s", filename, err)
+ }
+ return result
+}
+
+// Loads config information from a JSON string
+func LoadConfigString(s string) *Config {
+ result := newConfig()
+ err := json.Unmarshal([]byte(s), &result.data)
+ if err != nil {
+ glog.Fatalf("error parsing config string %s: %s", s, err)
+ }
+ return result
+}
+
+func (c *Config) StringMerge(s string) {
+ next := LoadConfigString(s)
+ c.merge(next.data)
+}
+
+func (c *Config) LoadMerge(filename string) {
+ next := LoadConfig(filename)
+ c.merge(next.data)
+}
+
+func (c *Config) merge(ndata map[string]interface{}) {
+ for k, v := range ndata {
+ c.data[k] = v
+ }
+}
+
+func (c *Config) parse() error {
+ f, err := os.Open(c.filename)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ b := new(bytes.Buffer)
+ _, err = b.ReadFrom(f)
+ if err != nil {
+ return err
+ }
+ err = json.Unmarshal(b.Bytes(), &c.data)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Returns a string for the config variable key
+func (c *Config) GetString(key string) string {
+ result, present := c.data[key]
+ if !present {
+ return ""
+ }
+ return result.(string)
+}
+
+// Returns an int for the config variable key
+func (c *Config) GetInt(key string) int {
+ x, ok := c.data[key]
+ if !ok {
+ return -1
+ }
+ return int(x.(float64))
+}
+
+// Returns a float for the config variable key
+func (c *Config) GetFloat(key string) float64 {
+ x, ok := c.data[key]
+ if !ok {
+ return -1
+ }
+ return x.(float64)
+}
+
+// Returns a bool for the config variable key
+func (c *Config) GetBool(key string) bool {
+ x, ok := c.data[key]
+ if !ok {
+ return false
+ }
+ return x.(bool)
+}
+
+// Returns an array for the config variable key
+func (c *Config) GetArray(key string) []interface{} {
+ result, present := c.data[key]
+ if !present {
+ return []interface{}(nil)
+ }
+ return result.([]interface{})
+}
diff --git a/weed/util/constants.go b/weed/util/constants.go
new file mode 100644
index 000000000..6b6b0b911
--- /dev/null
+++ b/weed/util/constants.go
@@ -0,0 +1,5 @@
+package util
+
+const (
+ VERSION = "0.71 beta"
+)
diff --git a/weed/util/file_util.go b/weed/util/file_util.go
new file mode 100644
index 000000000..a39fb0860
--- /dev/null
+++ b/weed/util/file_util.go
@@ -0,0 +1,38 @@
+package util
+
+import (
+ "bufio"
+ "errors"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func TestFolderWritable(folder string) (err error) {
+ fileInfo, err := os.Stat(folder)
+ if err != nil {
+ return err
+ }
+ if !fileInfo.IsDir() {
+ return errors.New("Not a valid folder!")
+ }
+ perm := fileInfo.Mode().Perm()
+ glog.V(0).Infoln("Folder", folder, "Permission:", perm)
+ if 0200&perm != 0 {
+ return nil
+ }
+ return errors.New("Not writable!")
+}
+
+func Readln(r *bufio.Reader) ([]byte, error) {
+ var (
+ isPrefix = true
+ err error
+ line, ln []byte
+ )
+ for isPrefix && err == nil {
+ line, isPrefix, err = r.ReadLine()
+ ln = append(ln, line...)
+ }
+ return ln, err
+}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
new file mode 100644
index 000000000..a54fc8779
--- /dev/null
+++ b/weed/util/http_util.go
@@ -0,0 +1,163 @@
+package util
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+var (
+ client *http.Client
+ Transport *http.Transport
+)
+
+func init() {
+ Transport = &http.Transport{
+ MaxIdleConnsPerHost: 1024,
+ }
+ client = &http.Client{Transport: Transport}
+}
+
+func PostBytes(url string, body []byte) ([]byte, error) {
+ r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
+ if err != nil {
+ return nil, fmt.Errorf("Post to %s: %v", url, err)
+ }
+ defer r.Body.Close()
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return nil, fmt.Errorf("Read response body: %v", err)
+ }
+ return b, nil
+}
+
+func Post(url string, values url.Values) ([]byte, error) {
+ r, err := client.PostForm(url, values)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Body.Close()
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return nil, err
+ }
+ return b, nil
+}
+
+func Get(url string) ([]byte, error) {
+ r, err := client.Get(url)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Body.Close()
+ b, err := ioutil.ReadAll(r.Body)
+ if r.StatusCode != 200 {
+ return nil, fmt.Errorf("%s: %s", url, r.Status)
+ }
+ if err != nil {
+ return nil, err
+ }
+ return b, nil
+}
+
+func Delete(url string, jwt security.EncodedJwt) error {
+ req, err := http.NewRequest("DELETE", url, nil)
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+ if err != nil {
+ return err
+ }
+ resp, e := client.Do(req)
+ if e != nil {
+ return e
+ }
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ switch resp.StatusCode {
+ case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
+ return nil
+ }
+ m := make(map[string]interface{})
+ if e := json.Unmarshal(body, m); e == nil {
+ if s, ok := m["error"].(string); ok {
+ return errors.New(s)
+ }
+ }
+ return errors.New(string(body))
+}
+
+func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
+ r, err := client.PostForm(url, values)
+ if err != nil {
+ return err
+ }
+ defer r.Body.Close()
+ if r.StatusCode != 200 {
+ return fmt.Errorf("%s: %s", url, r.Status)
+ }
+ bufferSize := len(allocatedBytes)
+ for {
+ n, err := r.Body.Read(allocatedBytes)
+ if n == bufferSize {
+ eachBuffer(allocatedBytes)
+ }
+ if err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ }
+ return nil
+}
+
+func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
+ r, err := client.PostForm(url, values)
+ if err != nil {
+ return err
+ }
+ defer r.Body.Close()
+ if r.StatusCode != 200 {
+ return fmt.Errorf("%s: %s", url, r.Status)
+ }
+ return readFn(r.Body)
+}
+
+func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) {
+ response, err := client.Get(fileUrl)
+ if err != nil {
+ return "", nil, err
+ }
+ contentDisposition := response.Header["Content-Disposition"]
+ if len(contentDisposition) > 0 {
+ if strings.HasPrefix(contentDisposition[0], "filename=") {
+ filename = contentDisposition[0][len("filename="):]
+ filename = strings.Trim(filename, "\"")
+ }
+ }
+ rc = response.Body
+ return
+}
+
+func Do(req *http.Request) (resp *http.Response, err error) {
+ return client.Do(req)
+}
+
+func NormalizeUrl(url string) string {
+ if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
+ return url
+ }
+ return "http://" + url
+}
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go
new file mode 100644
index 000000000..f46776992
--- /dev/null
+++ b/weed/util/net_timeout.go
@@ -0,0 +1,81 @@
+package util
+
+import (
+ "net"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/stats"
+)
+
+// Listener wraps a net.Listener, and gives a place to store the timeout
+// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
+type Listener struct {
+ net.Listener
+ ReadTimeout time.Duration
+ WriteTimeout time.Duration
+}
+
+func (l *Listener) Accept() (net.Conn, error) {
+ c, err := l.Listener.Accept()
+ if err != nil {
+ return nil, err
+ }
+ stats.ConnectionOpen()
+ tc := &Conn{
+ Conn: c,
+ ReadTimeout: l.ReadTimeout,
+ WriteTimeout: l.WriteTimeout,
+ }
+ return tc, nil
+}
+
+// Conn wraps a net.Conn, and sets a deadline for every read
+// and write operation.
+type Conn struct {
+ net.Conn
+ ReadTimeout time.Duration
+ WriteTimeout time.Duration
+}
+
+func (c *Conn) Read(b []byte) (count int, e error) {
+ err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
+ if err != nil {
+ return 0, err
+ }
+ count, e = c.Conn.Read(b)
+ if e == nil {
+ stats.BytesIn(int64(count))
+ }
+ return
+}
+
+func (c *Conn) Write(b []byte) (count int, e error) {
+ err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
+ if err != nil {
+ return 0, err
+ }
+ count, e = c.Conn.Write(b)
+ if e == nil {
+ stats.BytesOut(int64(count))
+ }
+ return
+}
+
+func (c *Conn) Close() error {
+ stats.ConnectionClose()
+ return c.Conn.Close()
+}
+
+func NewListener(addr string, timeout time.Duration) (net.Listener, error) {
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+
+ tl := &Listener{
+ Listener: l,
+ ReadTimeout: timeout,
+ WriteTimeout: timeout,
+ }
+ return tl, nil
+}
diff --git a/weed/util/parse.go b/weed/util/parse.go
new file mode 100644
index 000000000..0a8317c19
--- /dev/null
+++ b/weed/util/parse.go
@@ -0,0 +1,26 @@
+package util
+
+import (
+ "strconv"
+)
+
+func ParseInt(text string, defaultValue int) int {
+ count, parseError := strconv.ParseInt(text, 10, 64)
+ if parseError != nil {
+ if len(text) > 0 {
+ return 0
+ }
+ return defaultValue
+ }
+ return int(count)
+}
+func ParseUint64(text string, defaultValue uint64) uint64 {
+ count, parseError := strconv.ParseUint(text, 10, 64)
+ if parseError != nil {
+ if len(text) > 0 {
+ return 0
+ }
+ return defaultValue
+ }
+ return count
+}