diff options
Diffstat (limited to '')
-rw-r--r-- | models/dbfs/dbfile.go | 368 | ||||
-rw-r--r-- | models/dbfs/dbfs.go | 131 | ||||
-rw-r--r-- | models/dbfs/dbfs_test.go | 191 | ||||
-rw-r--r-- | models/dbfs/main_test.go | 14 |
4 files changed, 704 insertions, 0 deletions
diff --git a/models/dbfs/dbfile.go b/models/dbfs/dbfile.go new file mode 100644 index 0000000..dd27b5c --- /dev/null +++ b/models/dbfs/dbfile.go @@ -0,0 +1,368 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package dbfs + +import ( + "context" + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/models/db" +) + +var defaultFileBlockSize int64 = 32 * 1024 + +type File interface { + io.ReadWriteCloser + io.Seeker + fs.File +} + +type file struct { + ctx context.Context + metaID int64 + fullPath string + blockSize int64 + + allowRead bool + allowWrite bool + offset int64 +} + +var _ File = (*file)(nil) + +func (f *file) readAt(fileMeta *dbfsMeta, offset int64, p []byte) (n int, err error) { + if offset >= fileMeta.FileSize { + return 0, io.EOF + } + + blobPos := int(offset % f.blockSize) + blobOffset := offset - int64(blobPos) + blobRemaining := int(f.blockSize) - blobPos + needRead := len(p) + if needRead > blobRemaining { + needRead = blobRemaining + } + if blobOffset+int64(blobPos)+int64(needRead) > fileMeta.FileSize { + needRead = int(fileMeta.FileSize - blobOffset - int64(blobPos)) + } + if needRead <= 0 { + return 0, io.EOF + } + var fileData dbfsData + ok, err := db.GetEngine(f.ctx).Where("meta_id = ? AND blob_offset = ?", f.metaID, blobOffset).Get(&fileData) + if err != nil { + return 0, err + } + blobData := fileData.BlobData + if !ok { + blobData = nil + } + + canCopy := len(blobData) - blobPos + if canCopy <= 0 { + canCopy = 0 + } + realRead := needRead + if realRead > canCopy { + realRead = canCopy + } + if realRead > 0 { + copy(p[:realRead], fileData.BlobData[blobPos:blobPos+realRead]) + } + for i := realRead; i < needRead; i++ { + p[i] = 0 + } + return needRead, nil +} + +func (f *file) Read(p []byte) (n int, err error) { + if f.metaID == 0 || !f.allowRead { + return 0, os.ErrInvalid + } + + fileMeta, err := findFileMetaByID(f.ctx, f.metaID) + if err != nil { + return 0, err + } + n, err = f.readAt(fileMeta, f.offset, p) + f.offset += int64(n) + return n, err +} + +func (f *file) Write(p []byte) (n int, err error) { + if f.metaID == 0 || !f.allowWrite { + return 0, os.ErrInvalid + } + + fileMeta, err := findFileMetaByID(f.ctx, f.metaID) + if err != nil { + return 0, err + } + + needUpdateSize := false + written := 0 + for len(p) > 0 { + blobPos := int(f.offset % f.blockSize) + blobOffset := f.offset - int64(blobPos) + blobRemaining := int(f.blockSize) - blobPos + needWrite := len(p) + if needWrite > blobRemaining { + needWrite = blobRemaining + } + buf := make([]byte, f.blockSize) + readBytes, err := f.readAt(fileMeta, blobOffset, buf) + if err != nil && !errors.Is(err, io.EOF) { + return written, err + } + copy(buf[blobPos:blobPos+needWrite], p[:needWrite]) + if blobPos+needWrite > readBytes { + buf = buf[:blobPos+needWrite] + } else { + buf = buf[:readBytes] + } + + fileData := dbfsData{ + MetaID: fileMeta.ID, + BlobOffset: blobOffset, + BlobData: buf, + } + if res, err := db.GetEngine(f.ctx).Exec("UPDATE dbfs_data SET revision=revision+1, blob_data=? WHERE meta_id=? AND blob_offset=?", buf, fileMeta.ID, blobOffset); err != nil { + return written, err + } else if updated, err := res.RowsAffected(); err != nil { + return written, err + } else if updated == 0 { + if _, err = db.GetEngine(f.ctx).Insert(&fileData); err != nil { + return written, err + } + } + written += needWrite + f.offset += int64(needWrite) + if f.offset > fileMeta.FileSize { + fileMeta.FileSize = f.offset + needUpdateSize = true + } + p = p[needWrite:] + } + + fileMetaUpdate := dbfsMeta{ + ModifyTimestamp: timeToFileTimestamp(time.Now()), + } + if needUpdateSize { + fileMetaUpdate.FileSize = f.offset + } + if _, err := db.GetEngine(f.ctx).ID(fileMeta.ID).Update(fileMetaUpdate); err != nil { + return written, err + } + return written, nil +} + +func (f *file) Seek(n int64, whence int) (int64, error) { + if f.metaID == 0 { + return 0, os.ErrInvalid + } + + newOffset := f.offset + switch whence { + case io.SeekStart: + newOffset = n + case io.SeekCurrent: + newOffset += n + case io.SeekEnd: + size, err := f.size() + if err != nil { + return f.offset, err + } + newOffset = size + n + default: + return f.offset, os.ErrInvalid + } + if newOffset < 0 { + return f.offset, os.ErrInvalid + } + f.offset = newOffset + return newOffset, nil +} + +func (f *file) Close() error { + return nil +} + +func (f *file) Stat() (os.FileInfo, error) { + if f.metaID == 0 { + return nil, os.ErrInvalid + } + + fileMeta, err := findFileMetaByID(f.ctx, f.metaID) + if err != nil { + return nil, err + } + return fileMeta, nil +} + +func timeToFileTimestamp(t time.Time) int64 { + return t.UnixMicro() +} + +func fileTimestampToTime(timestamp int64) time.Time { + return time.UnixMicro(timestamp) +} + +func (f *file) loadMetaByPath() error { + var fileMeta dbfsMeta + if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil { + return err + } else if ok { + f.metaID = fileMeta.ID + f.blockSize = fileMeta.BlockSize + } + return nil +} + +func (f *file) open(flag int) (err error) { + // see os.OpenFile for flag values + if flag&os.O_WRONLY != 0 { + f.allowWrite = true + } else if flag&os.O_RDWR != 0 { + f.allowRead = true + f.allowWrite = true + } else /* O_RDONLY */ { + f.allowRead = true + } + + if f.allowWrite { + if flag&os.O_CREATE != 0 { + if flag&os.O_EXCL != 0 { + // file must not exist. + if f.metaID != 0 { + return os.ErrExist + } + } else { + // create a new file if none exists. + if f.metaID == 0 { + if err = f.createEmpty(); err != nil { + return err + } + } + } + } + if flag&os.O_TRUNC != 0 { + if err = f.truncate(); err != nil { + return err + } + } + if flag&os.O_APPEND != 0 { + if _, err = f.Seek(0, io.SeekEnd); err != nil { + return err + } + } + return nil + } + + // read only mode + if f.metaID == 0 { + return os.ErrNotExist + } + return nil +} + +func (f *file) createEmpty() error { + if f.metaID != 0 { + return os.ErrExist + } + now := time.Now() + _, err := db.GetEngine(f.ctx).Insert(&dbfsMeta{ + FullPath: f.fullPath, + BlockSize: f.blockSize, + CreateTimestamp: timeToFileTimestamp(now), + ModifyTimestamp: timeToFileTimestamp(now), + }) + if err != nil { + return err + } + return f.loadMetaByPath() +} + +func (f *file) truncate() error { + if f.metaID == 0 { + return os.ErrNotExist + } + return db.WithTx(f.ctx, func(ctx context.Context) error { + if _, err := db.GetEngine(ctx).Exec("UPDATE dbfs_meta SET file_size = 0 WHERE id = ?", f.metaID); err != nil { + return err + } + if _, err := db.GetEngine(ctx).Delete(&dbfsData{MetaID: f.metaID}); err != nil { + return err + } + return nil + }) +} + +func (f *file) renameTo(newPath string) error { + if f.metaID == 0 { + return os.ErrNotExist + } + newPath = buildPath(newPath) + return db.WithTx(f.ctx, func(ctx context.Context) error { + if _, err := db.GetEngine(ctx).Exec("UPDATE dbfs_meta SET full_path = ? WHERE id = ?", newPath, f.metaID); err != nil { + return err + } + return nil + }) +} + +func (f *file) delete() error { + if f.metaID == 0 { + return os.ErrNotExist + } + return db.WithTx(f.ctx, func(ctx context.Context) error { + if _, err := db.GetEngine(ctx).Delete(&dbfsMeta{ID: f.metaID}); err != nil { + return err + } + if _, err := db.GetEngine(ctx).Delete(&dbfsData{MetaID: f.metaID}); err != nil { + return err + } + return nil + }) +} + +func (f *file) size() (int64, error) { + if f.metaID == 0 { + return 0, os.ErrNotExist + } + fileMeta, err := findFileMetaByID(f.ctx, f.metaID) + if err != nil { + return 0, err + } + return fileMeta.FileSize, nil +} + +func findFileMetaByID(ctx context.Context, metaID int64) (*dbfsMeta, error) { + var fileMeta dbfsMeta + if ok, err := db.GetEngine(ctx).Where("id = ?", metaID).Get(&fileMeta); err != nil { + return nil, err + } else if ok { + return &fileMeta, nil + } + return nil, nil +} + +func buildPath(path string) string { + path = filepath.Clean(path) + path = strings.ReplaceAll(path, "\\", "/") + path = strings.TrimPrefix(path, "/") + return strconv.Itoa(strings.Count(path, "/")) + ":" + path +} + +func newDbFile(ctx context.Context, path string) (*file, error) { + path = buildPath(path) + f := &file{ctx: ctx, fullPath: path, blockSize: defaultFileBlockSize} + return f, f.loadMetaByPath() +} diff --git a/models/dbfs/dbfs.go b/models/dbfs/dbfs.go new file mode 100644 index 0000000..f68b4a2 --- /dev/null +++ b/models/dbfs/dbfs.go @@ -0,0 +1,131 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package dbfs + +import ( + "context" + "io/fs" + "os" + "path" + "time" + + "code.gitea.io/gitea/models/db" +) + +/* +The reasons behind the DBFS (database-filesystem) package: +When a Gitea action is running, the Gitea action server should collect and store all the logs. + +The requirements are: +* The running logs must be stored across the cluster if the Gitea servers are deployed as a cluster. +* The logs will be archived to Object Storage (S3/MinIO, etc.) after a period of time. +* The Gitea action UI should be able to render the running logs and the archived logs. + +Some possible solutions for the running logs: +* [Not ideal] Using local temp file: it can not be shared across the cluster. +* [Not ideal] Using shared file in the filesystem of git repository: although at the moment, the Gitea cluster's + git repositories must be stored in a shared filesystem, in the future, Gitea may need a dedicated Git Service Server + to decouple the shared filesystem. Then the action logs will become a blocker. +* [Not ideal] Record the logs in a database table line by line: it has a couple of problems: + - It's difficult to make multiple increasing sequence (log line number) for different databases. + - The database table will have a lot of rows and be affected by the big-table performance problem. + - It's difficult to load logs by using the same interface as other storages. + - It's difficult to calculate the size of the logs. + +The DBFS solution: +* It can be used in a cluster. +* It can share the same interface (Read/Write/Seek) as other storages. +* It's very friendly to database because it only needs to store much fewer rows than the log-line solution. +* In the future, when Gitea action needs to limit the log size (other CI/CD services also do so), it's easier to calculate the log file size. +* Even sometimes the UI needs to render the tailing lines, the tailing lines can be found be counting the "\n" from the end of the file by seek. + The seeking and finding is not the fastest way, but it's still acceptable and won't affect the performance too much. +*/ + +type dbfsMeta struct { + ID int64 `xorm:"pk autoincr"` + FullPath string `xorm:"VARCHAR(500) UNIQUE NOT NULL"` + BlockSize int64 `xorm:"BIGINT NOT NULL"` + FileSize int64 `xorm:"BIGINT NOT NULL"` + CreateTimestamp int64 `xorm:"BIGINT NOT NULL"` + ModifyTimestamp int64 `xorm:"BIGINT NOT NULL"` +} + +type dbfsData struct { + ID int64 `xorm:"pk autoincr"` + Revision int64 `xorm:"BIGINT NOT NULL"` + MetaID int64 `xorm:"BIGINT index(meta_offset) NOT NULL"` + BlobOffset int64 `xorm:"BIGINT index(meta_offset) NOT NULL"` + BlobSize int64 `xorm:"BIGINT NOT NULL"` + BlobData []byte `xorm:"BLOB NOT NULL"` +} + +func init() { + db.RegisterModel(new(dbfsMeta)) + db.RegisterModel(new(dbfsData)) +} + +func OpenFile(ctx context.Context, name string, flag int) (File, error) { + f, err := newDbFile(ctx, name) + if err != nil { + return nil, err + } + err = f.open(flag) + if err != nil { + _ = f.Close() + return nil, err + } + return f, nil +} + +func Open(ctx context.Context, name string) (File, error) { + return OpenFile(ctx, name, os.O_RDONLY) +} + +func Create(ctx context.Context, name string) (File, error) { + return OpenFile(ctx, name, os.O_RDWR|os.O_CREATE|os.O_TRUNC) +} + +func Rename(ctx context.Context, oldPath, newPath string) error { + f, err := newDbFile(ctx, oldPath) + if err != nil { + return err + } + defer f.Close() + return f.renameTo(newPath) +} + +func Remove(ctx context.Context, name string) error { + f, err := newDbFile(ctx, name) + if err != nil { + return err + } + defer f.Close() + return f.delete() +} + +var _ fs.FileInfo = (*dbfsMeta)(nil) + +func (m *dbfsMeta) Name() string { + return path.Base(m.FullPath) +} + +func (m *dbfsMeta) Size() int64 { + return m.FileSize +} + +func (m *dbfsMeta) Mode() fs.FileMode { + return os.ModePerm +} + +func (m *dbfsMeta) ModTime() time.Time { + return fileTimestampToTime(m.ModifyTimestamp) +} + +func (m *dbfsMeta) IsDir() bool { + return false +} + +func (m *dbfsMeta) Sys() any { + return nil +} diff --git a/models/dbfs/dbfs_test.go b/models/dbfs/dbfs_test.go new file mode 100644 index 0000000..3ad273a --- /dev/null +++ b/models/dbfs/dbfs_test.go @@ -0,0 +1,191 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package dbfs + +import ( + "bufio" + "io" + "os" + "testing" + + "code.gitea.io/gitea/models/db" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func changeDefaultFileBlockSize(n int64) (restore func()) { + old := defaultFileBlockSize + defaultFileBlockSize = n + return func() { + defaultFileBlockSize = old + } +} + +func TestDbfsBasic(t *testing.T) { + defer changeDefaultFileBlockSize(4)() + + // test basic write/read + f, err := OpenFile(db.DefaultContext, "test.txt", os.O_RDWR|os.O_CREATE) + require.NoError(t, err) + + n, err := f.Write([]byte("0123456789")) // blocks: 0123 4567 89 + require.NoError(t, err) + assert.EqualValues(t, 10, n) + + _, err = f.Seek(0, io.SeekStart) + require.NoError(t, err) + + buf, err := io.ReadAll(f) + require.NoError(t, err) + assert.EqualValues(t, 10, n) + assert.EqualValues(t, "0123456789", string(buf)) + + // write some new data + _, err = f.Seek(1, io.SeekStart) + require.NoError(t, err) + _, err = f.Write([]byte("bcdefghi")) // blocks: 0bcd efgh i9 + require.NoError(t, err) + + // read from offset + buf, err = io.ReadAll(f) + require.NoError(t, err) + assert.EqualValues(t, "9", string(buf)) + + // read all + _, err = f.Seek(0, io.SeekStart) + require.NoError(t, err) + buf, err = io.ReadAll(f) + require.NoError(t, err) + assert.EqualValues(t, "0bcdefghi9", string(buf)) + + // write to new size + _, err = f.Seek(-1, io.SeekEnd) + require.NoError(t, err) + _, err = f.Write([]byte("JKLMNOP")) // blocks: 0bcd efgh iJKL MNOP + require.NoError(t, err) + _, err = f.Seek(0, io.SeekStart) + require.NoError(t, err) + buf, err = io.ReadAll(f) + require.NoError(t, err) + assert.EqualValues(t, "0bcdefghiJKLMNOP", string(buf)) + + // write beyond EOF and fill with zero + _, err = f.Seek(5, io.SeekCurrent) + require.NoError(t, err) + _, err = f.Write([]byte("xyzu")) // blocks: 0bcd efgh iJKL MNOP 0000 0xyz u + require.NoError(t, err) + _, err = f.Seek(0, io.SeekStart) + require.NoError(t, err) + buf, err = io.ReadAll(f) + require.NoError(t, err) + assert.EqualValues(t, "0bcdefghiJKLMNOP\x00\x00\x00\x00\x00xyzu", string(buf)) + + // write to the block with zeros + _, err = f.Seek(-6, io.SeekCurrent) + require.NoError(t, err) + _, err = f.Write([]byte("ABCD")) // blocks: 0bcd efgh iJKL MNOP 000A BCDz u + require.NoError(t, err) + _, err = f.Seek(0, io.SeekStart) + require.NoError(t, err) + buf, err = io.ReadAll(f) + require.NoError(t, err) + assert.EqualValues(t, "0bcdefghiJKLMNOP\x00\x00\x00ABCDzu", string(buf)) + + require.NoError(t, f.Close()) + + // test rename + err = Rename(db.DefaultContext, "test.txt", "test2.txt") + require.NoError(t, err) + + _, err = OpenFile(db.DefaultContext, "test.txt", os.O_RDONLY) + require.Error(t, err) + + f, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // test remove + err = Remove(db.DefaultContext, "test2.txt") + require.NoError(t, err) + + _, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) + require.Error(t, err) + + // test stat + f, err = OpenFile(db.DefaultContext, "test/test.txt", os.O_RDWR|os.O_CREATE) + require.NoError(t, err) + stat, err := f.Stat() + require.NoError(t, err) + assert.EqualValues(t, "test.txt", stat.Name()) + assert.EqualValues(t, 0, stat.Size()) + _, err = f.Write([]byte("0123456789")) + require.NoError(t, err) + stat, err = f.Stat() + require.NoError(t, err) + assert.EqualValues(t, 10, stat.Size()) +} + +func TestDbfsReadWrite(t *testing.T) { + defer changeDefaultFileBlockSize(4)() + + f1, err := OpenFile(db.DefaultContext, "test.log", os.O_RDWR|os.O_CREATE) + require.NoError(t, err) + defer f1.Close() + + f2, err := OpenFile(db.DefaultContext, "test.log", os.O_RDONLY) + require.NoError(t, err) + defer f2.Close() + + _, err = f1.Write([]byte("line 1\n")) + require.NoError(t, err) + + f2r := bufio.NewReader(f2) + + line, err := f2r.ReadString('\n') + require.NoError(t, err) + assert.EqualValues(t, "line 1\n", line) + _, err = f2r.ReadString('\n') + require.ErrorIs(t, err, io.EOF) + + _, err = f1.Write([]byte("line 2\n")) + require.NoError(t, err) + + line, err = f2r.ReadString('\n') + require.NoError(t, err) + assert.EqualValues(t, "line 2\n", line) + _, err = f2r.ReadString('\n') + require.ErrorIs(t, err, io.EOF) +} + +func TestDbfsSeekWrite(t *testing.T) { + defer changeDefaultFileBlockSize(4)() + + f, err := OpenFile(db.DefaultContext, "test2.log", os.O_RDWR|os.O_CREATE) + require.NoError(t, err) + defer f.Close() + + n, err := f.Write([]byte("111")) + require.NoError(t, err) + + _, err = f.Seek(int64(n), io.SeekStart) + require.NoError(t, err) + + _, err = f.Write([]byte("222")) + require.NoError(t, err) + + _, err = f.Seek(int64(n), io.SeekStart) + require.NoError(t, err) + + _, err = f.Write([]byte("333")) + require.NoError(t, err) + + fr, err := OpenFile(db.DefaultContext, "test2.log", os.O_RDONLY) + require.NoError(t, err) + defer f.Close() + + buf, err := io.ReadAll(fr) + require.NoError(t, err) + assert.EqualValues(t, "111333", string(buf)) +} diff --git a/models/dbfs/main_test.go b/models/dbfs/main_test.go new file mode 100644 index 0000000..537ba09 --- /dev/null +++ b/models/dbfs/main_test.go @@ -0,0 +1,14 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package dbfs + +import ( + "testing" + + "code.gitea.io/gitea/models/unittest" +) + +func TestMain(m *testing.M) { + unittest.MainTest(m) +} |