use vgo

This commit is contained in:
magicsong
2019-07-19 18:20:58 +08:00
committed by zryfish
parent 45243c987d
commit c6ad0ebc3e
1267 changed files with 48006 additions and 311454 deletions

View File

@@ -22,22 +22,12 @@ const (
defaultPathPerm os.FileMode = 0777
)
// PathKey represents a string key that has been transformed to
// a directory and file name where the content will eventually
// be stored
type PathKey struct {
Path []string
FileName string
originalKey string
}
var (
defaultAdvancedTransform = func(s string) *PathKey { return &PathKey{Path: []string{}, FileName: s} }
defaultInverseTransform = func(pathKey *PathKey) string { return pathKey.FileName }
errCanceled = errors.New("canceled")
errEmptyKey = errors.New("empty key")
errBadKey = errors.New("bad key")
errImportDirectory = errors.New("can't import a directory")
defaultTransform = func(s string) []string { return []string{} }
errCanceled = errors.New("canceled")
errEmptyKey = errors.New("empty key")
errBadKey = errors.New("bad key")
errImportDirectory = errors.New("can't import a directory")
)
// TransformFunction transforms a key into a slice of strings, with each
@@ -48,34 +38,14 @@ var (
// the final location of the data file will be <basedir>/ab/cde/f/abcdef
type TransformFunction func(s string) []string
// AdvancedTransformFunction transforms a key into a PathKey.
//
// A PathKey contains a slice of strings, where each element in the slice
// represents a directory in the file path where the key's entry will eventually
// be stored, as well as the filename.
//
// For example, if AdvancedTransformFunc transforms "abcdef/file.txt" to the
// PathKey {Path: ["ab", "cde", "f"], FileName: "file.txt"}, the final location
// of the data file will be <basedir>/ab/cde/f/file.txt.
//
// You must provide an InverseTransformFunction if you use an
// AdvancedTransformFunction.
type AdvancedTransformFunction func(s string) *PathKey
// InverseTransformFunction takes a PathKey and converts it back to a Diskv key.
// In effect, it's the opposite of an AdvancedTransformFunction.
type InverseTransformFunction func(pathKey *PathKey) string
// Options define a set of properties that dictate Diskv behavior.
// All values are optional.
type Options struct {
BasePath string
Transform TransformFunction
AdvancedTransform AdvancedTransformFunction
InverseTransform InverseTransformFunction
CacheSizeMax uint64 // bytes
PathPerm os.FileMode
FilePerm os.FileMode
BasePath string
Transform TransformFunction
CacheSizeMax uint64 // bytes
PathPerm os.FileMode
FilePerm os.FileMode
// If TempDir is set, it will enable filesystem atomic writes by
// writing temporary files to that location before being moved
// to BasePath.
@@ -105,22 +75,9 @@ func New(o Options) *Diskv {
if o.BasePath == "" {
o.BasePath = defaultBasePath
}
if o.AdvancedTransform == nil {
if o.Transform == nil {
o.AdvancedTransform = defaultAdvancedTransform
} else {
o.AdvancedTransform = convertToAdvancedTransform(o.Transform)
}
if o.InverseTransform == nil {
o.InverseTransform = defaultInverseTransform
}
} else {
if o.InverseTransform == nil {
panic("You must provide an InverseTransform function in advanced mode")
}
if o.Transform == nil {
o.Transform = defaultTransform
}
if o.PathPerm == 0 {
o.PathPerm = defaultPathPerm
}
@@ -141,30 +98,11 @@ func New(o Options) *Diskv {
return d
}
// convertToAdvancedTransform takes a classic Transform function and
// converts it to the new AdvancedTransform
func convertToAdvancedTransform(oldFunc func(s string) []string) AdvancedTransformFunction {
return func(s string) *PathKey {
return &PathKey{Path: oldFunc(s), FileName: s}
}
}
// Write synchronously writes the key-value pair to disk, making it immediately
// available for reads. Write relies on the filesystem to perform an eventual
// sync to physical media. If you need stronger guarantees, see WriteStream.
func (d *Diskv) Write(key string, val []byte) error {
return d.WriteStream(key, bytes.NewReader(val), false)
}
// WriteString writes a string key-value pair to disk
func (d *Diskv) WriteString(key string, val string) error {
return d.Write(key, []byte(val))
}
func (d *Diskv) transform(key string) (pathKey *PathKey) {
pathKey = d.AdvancedTransform(key)
pathKey.originalKey = key
return pathKey
return d.WriteStream(key, bytes.NewBuffer(val), false)
}
// WriteStream writes the data represented by the io.Reader to the disk, under
@@ -177,28 +115,15 @@ func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error {
return errEmptyKey
}
pathKey := d.transform(key)
// Ensure keys cannot evaluate to paths that would not exist
for _, pathPart := range pathKey.Path {
if strings.ContainsRune(pathPart, os.PathSeparator) {
return errBadKey
}
}
if strings.ContainsRune(pathKey.FileName, os.PathSeparator) {
return errBadKey
}
d.mu.Lock()
defer d.mu.Unlock()
return d.writeStreamWithLock(pathKey, r, sync)
return d.writeStreamWithLock(key, r, sync)
}
// createKeyFileWithLock either creates the key file directly, or
// creates a temporary file in TempDir if it is set.
func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) {
func (d *Diskv) createKeyFileWithLock(key string) (*os.File, error) {
if d.TempDir != "" {
if err := os.MkdirAll(d.TempDir, d.PathPerm); err != nil {
return nil, fmt.Errorf("temp mkdir: %s", err)
@@ -217,7 +142,7 @@ func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) {
}
mode := os.O_WRONLY | os.O_CREATE | os.O_TRUNC // overwrite if exists
f, err := os.OpenFile(d.completeFilename(pathKey), mode, d.FilePerm)
f, err := os.OpenFile(d.completeFilename(key), mode, d.FilePerm)
if err != nil {
return nil, fmt.Errorf("open file: %s", err)
}
@@ -225,12 +150,12 @@ func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) {
}
// writeStream does no input validation checking.
func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) error {
if err := d.ensurePathWithLock(pathKey); err != nil {
func (d *Diskv) writeStreamWithLock(key string, r io.Reader, sync bool) error {
if err := d.ensurePathWithLock(key); err != nil {
return fmt.Errorf("ensure path: %s", err)
}
f, err := d.createKeyFileWithLock(pathKey)
f, err := d.createKeyFileWithLock(key)
if err != nil {
return fmt.Errorf("create key file: %s", err)
}
@@ -269,19 +194,18 @@ func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) er
return fmt.Errorf("file close: %s", err)
}
fullPath := d.completeFilename(pathKey)
if f.Name() != fullPath {
if err := os.Rename(f.Name(), fullPath); err != nil {
if f.Name() != d.completeFilename(key) {
if err := os.Rename(f.Name(), d.completeFilename(key)); err != nil {
os.Remove(f.Name()) // error deliberately ignored
return fmt.Errorf("rename: %s", err)
}
}
if d.Index != nil {
d.Index.Insert(pathKey.originalKey)
d.Index.Insert(key)
}
d.bustCacheWithLock(pathKey.originalKey) // cache only on read
d.bustCacheWithLock(key) // cache only on read
return nil
}
@@ -300,18 +224,16 @@ func (d *Diskv) Import(srcFilename, dstKey string, move bool) (err error) {
return errImportDirectory
}
dstPathKey := d.transform(dstKey)
d.mu.Lock()
defer d.mu.Unlock()
if err := d.ensurePathWithLock(dstPathKey); err != nil {
if err := d.ensurePathWithLock(dstKey); err != nil {
return fmt.Errorf("ensure path: %s", err)
}
if move {
if err := syscall.Rename(srcFilename, d.completeFilename(dstPathKey)); err == nil {
d.bustCacheWithLock(dstPathKey.originalKey)
if err := syscall.Rename(srcFilename, d.completeFilename(dstKey)); err == nil {
d.bustCacheWithLock(dstKey)
return nil
} else if err != syscall.EXDEV {
// If it failed due to being on a different device, fall back to copying
@@ -324,7 +246,7 @@ func (d *Diskv) Import(srcFilename, dstKey string, move bool) (err error) {
return err
}
defer f.Close()
err = d.writeStreamWithLock(dstPathKey, f, false)
err = d.writeStreamWithLock(dstKey, f, false)
if err == nil && move {
err = os.Remove(srcFilename)
}
@@ -344,13 +266,6 @@ func (d *Diskv) Read(key string) ([]byte, error) {
return ioutil.ReadAll(rc)
}
// ReadString reads the key and returns a string value
// In case of error, an empty string is returned
func (d *Diskv) ReadString(key string) string {
value, _ := d.Read(key)
return string(value)
}
// ReadStream reads the key and returns the value (data) as an io.ReadCloser.
// If the value is cached from a previous read, and direct is false,
// ReadStream will use the cached value. Otherwise, it will return a handle to
@@ -362,14 +277,12 @@ func (d *Diskv) ReadString(key string) string {
// If compression is enabled, ReadStream taps into the io.Reader stream prior
// to decompression, and caches the compressed data.
func (d *Diskv) ReadStream(key string, direct bool) (io.ReadCloser, error) {
pathKey := d.transform(key)
d.mu.RLock()
defer d.mu.RUnlock()
if val, ok := d.cache[key]; ok {
if !direct {
buf := bytes.NewReader(val)
buf := bytes.NewBuffer(val)
if d.Compression != nil {
return d.Compression.Reader(buf)
}
@@ -383,15 +296,15 @@ func (d *Diskv) ReadStream(key string, direct bool) (io.ReadCloser, error) {
}()
}
return d.readWithRLock(pathKey)
return d.readWithRLock(key)
}
// read ignores the cache, and returns an io.ReadCloser representing the
// decompressed data for the given key, streamed from the disk. Clients should
// acquire a read lock on the Diskv and check the cache themselves before
// calling read.
func (d *Diskv) readWithRLock(pathKey *PathKey) (io.ReadCloser, error) {
filename := d.completeFilename(pathKey)
func (d *Diskv) readWithRLock(key string) (io.ReadCloser, error) {
filename := d.completeFilename(key)
fi, err := os.Stat(filename)
if err != nil {
@@ -408,7 +321,7 @@ func (d *Diskv) readWithRLock(pathKey *PathKey) (io.ReadCloser, error) {
var r io.Reader
if d.CacheSizeMax > 0 {
r = newSiphon(f, d, pathKey.originalKey)
r = newSiphon(f, d, key)
} else {
r = &closingReader{f}
}
@@ -482,7 +395,6 @@ func (s *siphon) Read(p []byte) (int, error) {
// Erase synchronously erases the given key from the disk and the cache.
func (d *Diskv) Erase(key string) error {
pathKey := d.transform(key)
d.mu.Lock()
defer d.mu.Unlock()
@@ -494,7 +406,7 @@ func (d *Diskv) Erase(key string) error {
}
// erase from disk
filename := d.completeFilename(pathKey)
filename := d.completeFilename(key)
if s, err := os.Stat(filename); err == nil {
if s.IsDir() {
return errBadKey
@@ -529,7 +441,6 @@ func (d *Diskv) EraseAll() error {
// Has returns true if the given key exists.
func (d *Diskv) Has(key string) bool {
pathKey := d.transform(key)
d.mu.Lock()
defer d.mu.Unlock()
@@ -537,7 +448,7 @@ func (d *Diskv) Has(key string) bool {
return true
}
filename := d.completeFilename(pathKey)
filename := d.completeFilename(key)
s, err := os.Stat(filename)
if err != nil {
return false
@@ -565,12 +476,11 @@ func (d *Diskv) KeysPrefix(prefix string, cancel <-chan struct{}) <-chan string
if prefix == "" {
prepath = d.BasePath
} else {
prefixKey := d.transform(prefix)
prepath = d.pathFor(prefixKey)
prepath = d.pathFor(prefix)
}
c := make(chan string)
go func() {
filepath.Walk(prepath, d.walker(c, prefix, cancel))
filepath.Walk(prepath, walker(c, prefix, cancel))
close(c)
}()
return c
@@ -578,30 +488,18 @@ func (d *Diskv) KeysPrefix(prefix string, cancel <-chan struct{}) <-chan string
// walker returns a function which satisfies the filepath.WalkFunc interface.
// It sends every non-directory file entry down the channel c.
func (d *Diskv) walker(c chan<- string, prefix string, cancel <-chan struct{}) filepath.WalkFunc {
func walker(c chan<- string, prefix string, cancel <-chan struct{}) filepath.WalkFunc {
return func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
relPath, _ := filepath.Rel(d.BasePath, path)
dir, file := filepath.Split(relPath)
pathSplit := strings.Split(dir, string(filepath.Separator))
pathSplit = pathSplit[:len(pathSplit)-1]
pathKey := &PathKey{
Path: pathSplit,
FileName: file,
}
key := d.InverseTransform(pathKey)
if info.IsDir() || !strings.HasPrefix(key, prefix) {
if info.IsDir() || !strings.HasPrefix(info.Name(), prefix) {
return nil // "pass"
}
select {
case c <- key:
case c <- info.Name():
case <-cancel:
return errCanceled
}
@@ -612,19 +510,19 @@ func (d *Diskv) walker(c chan<- string, prefix string, cancel <-chan struct{}) f
// pathFor returns the absolute path for location on the filesystem where the
// data for the given key will be stored.
func (d *Diskv) pathFor(pathKey *PathKey) string {
return filepath.Join(d.BasePath, filepath.Join(pathKey.Path...))
func (d *Diskv) pathFor(key string) string {
return filepath.Join(d.BasePath, filepath.Join(d.Transform(key)...))
}
// ensurePathWithLock is a helper function that generates all necessary
// directories on the filesystem for the given key.
func (d *Diskv) ensurePathWithLock(pathKey *PathKey) error {
return os.MkdirAll(d.pathFor(pathKey), d.PathPerm)
func (d *Diskv) ensurePathWithLock(key string) error {
return os.MkdirAll(d.pathFor(key), d.PathPerm)
}
// completeFilename returns the absolute path to the file for the given key.
func (d *Diskv) completeFilename(pathKey *PathKey) string {
return filepath.Join(d.pathFor(pathKey), pathKey.FileName)
func (d *Diskv) completeFilename(key string) string {
return filepath.Join(d.pathFor(key), key)
}
// cacheWithLock attempts to cache the given key-value pair in the store's
@@ -666,7 +564,7 @@ func (d *Diskv) uncacheWithLock(key string, sz uint64) {
// pruneDirsWithLock deletes empty directories in the path walk leading to the
// key k. Typically this function is called after an Erase is made.
func (d *Diskv) pruneDirsWithLock(key string) error {
pathlist := d.transform(key).Path
pathlist := d.Transform(key)
for i := range pathlist {
dir := filepath.Join(d.BasePath, filepath.Join(pathlist[:len(pathlist)-i]...))