summaryrefslogtreecommitdiffstats
path: root/fs/ceph/dir.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/dir.c')
-rw-r--r--fs/ceph/dir.c132
1 files changed, 121 insertions, 11 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index d0cd0aba5843..d594c2627430 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -335,8 +335,11 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ctx->pos = 2;
}
- /* can we use the dcache? */
spin_lock(&ci->i_ceph_lock);
+ /* request Fx cap. if have Fx, we don't need to release Fs cap
+ * for later create/unlink. */
+ __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR);
+ /* can we use the dcache? */
if (ceph_test_mount_opt(fsc, DCACHE) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
@@ -752,7 +755,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
struct ceph_dentry_info *di = ceph_dentry(dentry);
spin_lock(&ci->i_ceph_lock);
- dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
+ dout(" dir %p flags are 0x%lx\n", dir, ci->i_ceph_flags);
if (strncmp(dentry->d_name.name,
fsc->mount_options->snapdir_name,
dentry->d_name.len) &&
@@ -760,6 +763,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
ceph_test_mount_opt(fsc, DCACHE) &&
__ceph_dir_is_complete(ci) &&
(__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
+ __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
spin_unlock(&ci->i_ceph_lock);
dout(" dir %p complete, -ENOENT\n", dir);
d_add(dentry, NULL);
@@ -1036,6 +1040,78 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
return err;
}
+static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ int result = req->r_err ? req->r_err :
+ le32_to_cpu(req->r_reply_info.head->result);
+
+ if (result == -EJUKEBOX)
+ goto out;
+
+ /* If op failed, mark everyone involved for errors */
+ if (result) {
+ int pathlen;
+ u64 base;
+ char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
+ &base, 0);
+
+ /* mark error on parent + clear complete */
+ mapping_set_error(req->r_parent->i_mapping, result);
+ ceph_dir_clear_complete(req->r_parent);
+
+ /* drop the dentry -- we don't know its status */
+ if (!d_unhashed(req->r_dentry))
+ d_drop(req->r_dentry);
+
+ /* mark inode itself for an error (since metadata is bogus) */
+ mapping_set_error(req->r_old_inode->i_mapping, result);
+
+ pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
+ base, IS_ERR(path) ? "<<bad>>" : path, result);
+ ceph_mdsc_free_path(path, pathlen);
+ }
+out:
+ iput(req->r_old_inode);
+ ceph_mdsc_release_dir_caps(req);
+}
+
+static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry)
+{
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct ceph_dentry_info *di;
+ int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK;
+
+ spin_lock(&ci->i_ceph_lock);
+ if ((__ceph_caps_issued(ci, NULL) & want) == want) {
+ ceph_take_cap_refs(ci, want, false);
+ got = want;
+ }
+ spin_unlock(&ci->i_ceph_lock);
+
+ /* If we didn't get anything, return 0 */
+ if (!got)
+ return 0;
+
+ spin_lock(&dentry->d_lock);
+ di = ceph_dentry(dentry);
+ /*
+ * - We are holding Fx, which implies Fs caps.
+ * - Only support async unlink for primary linkage
+ */
+ if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen ||
+ !(di->flags & CEPH_DENTRY_PRIMARY_LINK))
+ want = 0;
+ spin_unlock(&dentry->d_lock);
+
+ /* Do we still want what we've got? */
+ if (want == got)
+ return got;
+
+ ceph_put_cap_refs(ci, got);
+ return 0;
+}
+
/*
* rmdir and unlink are differ only by the metadata op code
*/
@@ -1045,6 +1121,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = d_inode(dentry);
struct ceph_mds_request *req;
+ bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
int err = -EROFS;
int op;
@@ -1059,6 +1136,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
} else
goto out;
+retry:
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req)) {
err = PTR_ERR(req);
@@ -1067,13 +1145,39 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_parent = dir;
- set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
- err = ceph_mdsc_do_request(mdsc, dir, req);
- if (!err && !req->r_reply_info.head->is_dentry)
- d_delete(dentry);
+
+ if (try_async && op == CEPH_MDS_OP_UNLINK &&
+ (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
+ dout("async unlink on %lu/%.*s caps=%s", dir->i_ino,
+ dentry->d_name.len, dentry->d_name.name,
+ ceph_cap_string(req->r_dir_caps));
+ set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
+ req->r_callback = ceph_async_unlink_cb;
+ req->r_old_inode = d_inode(dentry);
+ ihold(req->r_old_inode);
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
+ if (!err) {
+ /*
+ * We have enough caps, so we assume that the unlink
+ * will succeed. Fix up the target inode and dcache.
+ */
+ drop_nlink(inode);
+ d_delete(dentry);
+ } else if (err == -EJUKEBOX) {
+ try_async = false;
+ ceph_mdsc_put_request(req);
+ goto retry;
+ }
+ } else {
+ set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
+ if (!err && !req->r_reply_info.head->is_dentry)
+ d_delete(dentry);
+ }
+
ceph_mdsc_put_request(req);
out:
return err;
@@ -1411,6 +1515,7 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
spin_lock(&dentry->d_lock);
di->time = jiffies;
di->lease_shared_gen = 0;
+ di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
__dentry_lease_unlist(di);
spin_unlock(&dentry->d_lock);
}
@@ -1520,7 +1625,8 @@ static int __dir_lease_try_check(const struct dentry *dentry)
/*
* Check if directory-wide content lease/cap is valid.
*/
-static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
+static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry,
+ struct ceph_mds_client *mdsc)
{
struct ceph_inode_info *ci = ceph_inode(dir);
int valid;
@@ -1528,7 +1634,10 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
spin_lock(&ci->i_ceph_lock);
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
- shared_gen = atomic_read(&ci->i_shared_gen);
+ if (valid) {
+ __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
+ shared_gen = atomic_read(&ci->i_shared_gen);
+ }
spin_unlock(&ci->i_ceph_lock);
if (valid) {
struct ceph_dentry_info *di;
@@ -1554,6 +1663,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
int valid = 0;
struct dentry *parent;
struct inode *dir, *inode;
+ struct ceph_mds_client *mdsc;
if (flags & LOOKUP_RCU) {
parent = READ_ONCE(dentry->d_parent);
@@ -1570,6 +1680,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p '%pd' inode %p offset 0x%llx\n", dentry,
dentry, inode, ceph_dentry(dentry)->offset);
+ mdsc = ceph_sb_to_client(dir->i_sb)->mdsc;
+
/* always trust cached snapped dentries, snapdir dentry */
if (ceph_snap(dir) != CEPH_NOSNAP) {
dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
@@ -1581,7 +1693,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
valid = dentry_lease_is_valid(dentry, flags);
if (valid == -ECHILD)
return valid;
- if (valid || dir_lease_is_valid(dir, dentry)) {
+ if (valid || dir_lease_is_valid(dir, dentry, mdsc)) {
if (inode)
valid = ceph_is_any_caps(inode);
else
@@ -1590,8 +1702,6 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
}
if (!valid) {
- struct ceph_mds_client *mdsc =
- ceph_sb_to_client(dir->i_sb)->mdsc;
struct ceph_mds_request *req;
int op, err;
u32 mask;