summaryrefslogtreecommitdiff
path: root/drivers/staging/lustre/lustre/osc
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/osc')
-rw-r--r--drivers/staging/lustre/lustre/osc/lproc_osc.c41
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_cache.c279
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_cl_internal.h6
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_internal.h9
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_io.c46
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_lock.c4
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_object.c7
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_page.c278
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_request.c400
9 files changed, 453 insertions, 617 deletions
diff --git a/drivers/staging/lustre/lustre/osc/lproc_osc.c b/drivers/staging/lustre/lustre/osc/lproc_osc.c
index 7e83d395b998..f0062d44ee03 100644
--- a/drivers/staging/lustre/lustre/osc/lproc_osc.c
+++ b/drivers/staging/lustre/lustre/osc/lproc_osc.c
@@ -119,6 +119,7 @@ static ssize_t max_rpcs_in_flight_store(struct kobject *kobj,
spin_lock(&cli->cl_loi_list_lock);
cli->cl_max_rpcs_in_flight = val;
+ client_adjust_max_dirty(cli);
spin_unlock(&cli->cl_loi_list_lock);
return count;
@@ -136,10 +137,10 @@ static ssize_t max_dirty_mb_show(struct kobject *kobj,
int mult;
spin_lock(&cli->cl_loi_list_lock);
- val = cli->cl_dirty_max;
+ val = cli->cl_dirty_max_pages;
spin_unlock(&cli->cl_loi_list_lock);
- mult = 1 << 20;
+ mult = 1 << (20 - PAGE_SHIFT);
return lprocfs_read_frac_helper(buf, PAGE_SIZE, val, mult);
}
@@ -166,7 +167,7 @@ static ssize_t max_dirty_mb_store(struct kobject *kobj,
return -ERANGE;
spin_lock(&cli->cl_loi_list_lock);
- cli->cl_dirty_max = (u32)(pages_number << PAGE_SHIFT);
+ cli->cl_dirty_max_pages = pages_number;
osc_wake_cache_waiters(cli);
spin_unlock(&cli->cl_loi_list_lock);
@@ -181,11 +182,11 @@ static int osc_cached_mb_seq_show(struct seq_file *m, void *v)
int shift = 20 - PAGE_SHIFT;
seq_printf(m,
- "used_mb: %d\n"
- "busy_cnt: %d\n",
- (atomic_read(&cli->cl_lru_in_list) +
- atomic_read(&cli->cl_lru_busy)) >> shift,
- atomic_read(&cli->cl_lru_busy));
+ "used_mb: %ld\n"
+ "busy_cnt: %ld\n",
+ (atomic_long_read(&cli->cl_lru_in_list) +
+ atomic_long_read(&cli->cl_lru_busy)) >> shift,
+ atomic_long_read(&cli->cl_lru_busy));
return 0;
}
@@ -197,8 +198,10 @@ static ssize_t osc_cached_mb_seq_write(struct file *file,
{
struct obd_device *dev = ((struct seq_file *)file->private_data)->private;
struct client_obd *cli = &dev->u.cli;
- int pages_number, mult, rc;
+ long pages_number, rc;
char kernbuf[128];
+ int mult;
+ u64 val;
if (count >= sizeof(kernbuf))
return -EINVAL;
@@ -210,14 +213,18 @@ static ssize_t osc_cached_mb_seq_write(struct file *file,
mult = 1 << (20 - PAGE_SHIFT);
buffer += lprocfs_find_named_value(kernbuf, "used_mb:", &count) -
kernbuf;
- rc = lprocfs_write_frac_helper(buffer, count, &pages_number, mult);
+ rc = lprocfs_write_frac_u64_helper(buffer, count, &val, mult);
if (rc)
return rc;
+ if (val > LONG_MAX)
+ return -ERANGE;
+ pages_number = (long)val;
+
if (pages_number < 0)
return -ERANGE;
- rc = atomic_read(&cli->cl_lru_in_list) - pages_number;
+ rc = atomic_long_read(&cli->cl_lru_in_list) - pages_number;
if (rc > 0) {
struct lu_env *env;
int refcheck;
@@ -244,7 +251,7 @@ static ssize_t cur_dirty_bytes_show(struct kobject *kobj,
int len;
spin_lock(&cli->cl_loi_list_lock);
- len = sprintf(buf, "%lu\n", cli->cl_dirty);
+ len = sprintf(buf, "%lu\n", cli->cl_dirty_pages << PAGE_SHIFT);
spin_unlock(&cli->cl_loi_list_lock);
return len;
@@ -583,6 +590,7 @@ static ssize_t max_pages_per_rpc_store(struct kobject *kobj,
}
spin_lock(&cli->cl_loi_list_lock);
cli->cl_max_pages_per_rpc = val;
+ client_adjust_max_dirty(cli);
spin_unlock(&cli->cl_loi_list_lock);
return count;
@@ -596,13 +604,14 @@ static ssize_t unstable_stats_show(struct kobject *kobj,
struct obd_device *dev = container_of(kobj, struct obd_device,
obd_kobj);
struct client_obd *cli = &dev->u.cli;
- int pages, mb;
+ long pages;
+ int mb;
- pages = atomic_read(&cli->cl_unstable_count);
+ pages = atomic_long_read(&cli->cl_unstable_count);
mb = (pages * PAGE_SIZE) >> 20;
- return sprintf(buf, "unstable_pages: %8d\n"
- "unstable_mb: %8d\n", pages, mb);
+ return sprintf(buf, "unstable_pages: %20ld\n"
+ "unstable_mb: %10d\n", pages, mb);
}
LUSTRE_RO_ATTR(unstable_stats);
diff --git a/drivers/staging/lustre/lustre/osc/osc_cache.c b/drivers/staging/lustre/lustre/osc/osc_cache.c
index d011135802d5..4bbe219add98 100644
--- a/drivers/staging/lustre/lustre/osc/osc_cache.c
+++ b/drivers/staging/lustre/lustre/osc/osc_cache.c
@@ -44,7 +44,7 @@ static int extent_debug; /* set it to be true for more debug */
static void osc_update_pending(struct osc_object *obj, int cmd, int delta);
static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
- int state);
+ enum osc_extent_state state);
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
struct osc_async_page *oap, int sent, int rc);
static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
@@ -177,7 +177,7 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
{
struct osc_object *obj = ext->oe_obj;
struct osc_async_page *oap;
- int page_count;
+ size_t page_count;
int rc = 0;
if (!osc_object_is_locked(obj)) {
@@ -632,7 +632,7 @@ static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
*/
static struct osc_extent *osc_extent_find(const struct lu_env *env,
struct osc_object *obj, pgoff_t index,
- int *grants)
+ unsigned int *grants)
{
struct client_obd *cli = osc_cli(obj);
struct osc_lock *olck;
@@ -643,10 +643,10 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
struct osc_extent *found = NULL;
pgoff_t chunk;
pgoff_t max_end;
- int max_pages; /* max_pages_per_rpc */
- int chunksize;
+ unsigned int max_pages; /* max_pages_per_rpc */
+ unsigned int chunksize;
int ppc_bits; /* pages per chunk bits */
- int chunk_mask;
+ pgoff_t chunk_mask;
int rc;
cur = osc_extent_alloc(obj);
@@ -700,8 +700,8 @@ restart:
if (!ext)
ext = first_extent(obj);
while (ext) {
- loff_t ext_chk_start = ext->oe_start >> ppc_bits;
- loff_t ext_chk_end = ext->oe_end >> ppc_bits;
+ pgoff_t ext_chk_start = ext->oe_start >> ppc_bits;
+ pgoff_t ext_chk_end = ext->oe_end >> ppc_bits;
LASSERT(sanity_check_nolock(ext) == 0);
if (chunk > ext_chk_end + 1)
@@ -913,7 +913,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
return 0;
}
-static int extent_wait_cb(struct osc_extent *ext, int state)
+static int extent_wait_cb(struct osc_extent *ext, enum osc_extent_state state)
{
int ret;
@@ -928,7 +928,7 @@ static int extent_wait_cb(struct osc_extent *ext, int state)
* Wait for the extent's state to become @state.
*/
static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
- int state)
+ enum osc_extent_state state)
{
struct osc_object *obj = ext->oe_obj;
struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
@@ -958,8 +958,8 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
if (rc == -ETIMEDOUT) {
OSC_EXTENT_DUMP(D_ERROR, ext,
- "%s: wait ext to %d timedout, recovery in progress?\n",
- osc_export(obj)->exp_obd->obd_name, state);
+ "%s: wait ext to %u timedout, recovery in progress?\n",
+ osc_export(obj)->exp_obd->obd_name, state);
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
@@ -1099,7 +1099,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
struct osc_async_page *oap;
struct osc_async_page *last = NULL;
struct osc_object *obj = ext->oe_obj;
- int page_count = 0;
+ unsigned int page_count = 0;
int rc;
/* we're going to grab page lock, so object lock must not be taken. */
@@ -1140,9 +1140,11 @@ static int osc_extent_make_ready(const struct lu_env *env,
* the size of file.
*/
if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
- last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
- LASSERT(last->oap_count > 0);
- LASSERT(last->oap_page_off + last->oap_count <= PAGE_SIZE);
+ int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
+
+ LASSERT(last_oap_count > 0);
+ LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
+ last->oap_count = last_oap_count;
spin_lock(&last->oap_lock);
last->oap_async_flags |= ASYNC_COUNT_STABLE;
spin_unlock(&last->oap_lock);
@@ -1174,7 +1176,8 @@ static int osc_extent_make_ready(const struct lu_env *env,
* called to expand the extent for the same IO. To expand the extent, the
* page index must be in the same or next chunk of ext->oe_end.
*/
-static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
+static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
+ unsigned int *grants)
{
struct osc_object *obj = ext->oe_obj;
struct client_obd *cli = osc_cli(obj);
@@ -1183,7 +1186,7 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
pgoff_t chunk = index >> ppc_bits;
pgoff_t end_chunk;
pgoff_t end_index;
- int chunksize = 1 << cli->cl_chunkbits;
+ unsigned int chunksize = 1 << cli->cl_chunkbits;
int rc = 0;
LASSERT(ext->oe_max_end >= index && ext->oe_start <= index);
@@ -1361,7 +1364,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
if (rc == 0 && srvlock) {
struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
- int bytes = oap->oap_count;
+ size_t bytes = oap->oap_count;
if (crt == CRT_READ)
stats->os_lockless_reads += bytes;
@@ -1383,18 +1386,16 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
#define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \
struct client_obd *__tmp = (cli); \
- CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d " \
- "unstable_pages: %d/%d dropped: %ld avail: %ld, " \
- "reserved: %ld, flight: %d } lru {in list: %d, " \
- "left: %d, waiters: %d }" fmt, \
+ CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu " \
+ "dropped: %ld avail: %ld, reserved: %ld, flight: %d }" \
+ "lru {in list: %ld, left: %ld, waiters: %d }" fmt "\n", \
__tmp->cl_import->imp_obd->obd_name, \
- __tmp->cl_dirty, __tmp->cl_dirty_max, \
- atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \
- atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \
+ __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages, \
+ atomic_long_read(&obd_dirty_pages), obd_max_dirty_pages, \
__tmp->cl_lost_grant, __tmp->cl_avail_grant, \
__tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \
- atomic_read(&__tmp->cl_lru_in_list), \
- atomic_read(&__tmp->cl_lru_busy), \
+ atomic_long_read(&__tmp->cl_lru_in_list), \
+ atomic_long_read(&__tmp->cl_lru_busy), \
atomic_read(&__tmp->cl_lru_shrinkers), ##args); \
} while (0)
@@ -1404,8 +1405,8 @@ static void osc_consume_write_grant(struct client_obd *cli,
{
assert_spin_locked(&cli->cl_loi_list_lock);
LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
- atomic_inc(&obd_dirty_pages);
- cli->cl_dirty += PAGE_SIZE;
+ atomic_long_inc(&obd_dirty_pages);
+ cli->cl_dirty_pages++;
pga->flag |= OBD_BRW_FROM_GRANT;
CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
PAGE_SIZE, pga, pga->pg);
@@ -1424,12 +1425,12 @@ static void osc_release_write_grant(struct client_obd *cli,
}
pga->flag &= ~OBD_BRW_FROM_GRANT;
- atomic_dec(&obd_dirty_pages);
- cli->cl_dirty -= PAGE_SIZE;
+ atomic_long_dec(&obd_dirty_pages);
+ cli->cl_dirty_pages--;
if (pga->flag & OBD_BRW_NOCACHE) {
pga->flag &= ~OBD_BRW_NOCACHE;
- atomic_dec(&obd_dirty_transit_pages);
- cli->cl_dirty_transit -= PAGE_SIZE;
+ atomic_long_dec(&obd_dirty_transit_pages);
+ cli->cl_dirty_transit--;
}
}
@@ -1494,11 +1495,11 @@ static void osc_unreserve_grant(struct client_obd *cli,
static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
unsigned int lost_grant)
{
- int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
+ unsigned long grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
spin_lock(&cli->cl_loi_list_lock);
- atomic_sub(nr_pages, &obd_dirty_pages);
- cli->cl_dirty -= nr_pages << PAGE_SHIFT;
+ atomic_long_sub(nr_pages, &obd_dirty_pages);
+ cli->cl_dirty_pages -= nr_pages;
cli->cl_lost_grant += lost_grant;
if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
/* borrow some grant from truncate to avoid the case that
@@ -1511,7 +1512,7 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
spin_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n",
lost_grant, cli->cl_lost_grant,
- cli->cl_avail_grant, cli->cl_dirty);
+ cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_SHIFT);
}
/**
@@ -1535,19 +1536,18 @@ static int osc_enter_cache_try(struct client_obd *cli,
{
int rc;
- OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
+ OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes);
rc = osc_reserve_grant(cli, bytes);
if (rc < 0)
return 0;
- if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
- atomic_read(&obd_unstable_pages) + 1 +
- atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
+ if (cli->cl_dirty_pages <= cli->cl_dirty_max_pages &&
+ atomic_long_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
osc_consume_write_grant(cli, &oap->oap_brw_page);
if (transient) {
- cli->cl_dirty_transit += PAGE_SIZE;
- atomic_inc(&obd_dirty_transit_pages);
+ cli->cl_dirty_transit++;
+ atomic_long_inc(&obd_dirty_transit_pages);
oap->oap_brw_flags |= OBD_BRW_NOCACHE;
}
rc = 1;
@@ -1581,11 +1581,13 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
struct osc_object *osc = oap->oap_obj;
struct lov_oinfo *loi = osc->oo_oinfo;
struct osc_cache_waiter ocw;
- struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
- LWI_ON_SIGNAL_NOOP, NULL);
+ struct l_wait_info lwi;
int rc = -EDQUOT;
- OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
+ lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(AT_OFF ? obd_timeout : at_max),
+ NULL, LWI_ON_SIGNAL_NOOP, NULL);
+
+ OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes);
spin_lock(&cli->cl_loi_list_lock);
@@ -1593,14 +1595,16 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
* of queued writes and create a discontiguous rpc stream
*/
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
- cli->cl_dirty_max < PAGE_SIZE ||
- cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
+ !cli->cl_dirty_max_pages || cli->cl_ar.ar_force_sync ||
+ loi->loi_ar.ar_force_sync) {
+ OSC_DUMP_GRANT(D_CACHE, cli, "forced sync i/o\n");
rc = -EDQUOT;
goto out;
}
/* Hopefully normal case - cache space and write credits available */
if (osc_enter_cache_try(cli, oap, bytes, 0)) {
+ OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n");
rc = 0;
goto out;
}
@@ -1615,7 +1619,7 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
init_waitqueue_head(&ocw.ocw_waitq);
ocw.ocw_oap = oap;
ocw.ocw_grant = bytes;
- while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
+ while (cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0) {
list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
ocw.ocw_rc = 0;
spin_unlock(&cli->cl_loi_list_lock);
@@ -1629,32 +1633,49 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
spin_lock(&cli->cl_loi_list_lock);
- /* l_wait_event is interrupted by signal, or timed out */
if (rc < 0) {
- if (rc == -ETIMEDOUT) {
- OSC_DUMP_GRANT(D_ERROR, cli,
- "try to reserve %d.\n", bytes);
- osc_extent_tree_dump(D_ERROR, osc);
- rc = -EDQUOT;
- }
-
+ /* l_wait_event is interrupted by signal, or timed out */
list_del_init(&ocw.ocw_entry);
- goto out;
+ break;
}
-
LASSERT(list_empty(&ocw.ocw_entry));
rc = ocw.ocw_rc;
if (rc != -EDQUOT)
- goto out;
+ break;
if (osc_enter_cache_try(cli, oap, bytes, 0)) {
rc = 0;
- goto out;
+ break;
}
}
+
+ switch (rc) {
+ case 0:
+ OSC_DUMP_GRANT(D_CACHE, cli, "finally got grant space\n");
+ break;
+ case -ETIMEDOUT:
+ OSC_DUMP_GRANT(D_CACHE, cli,
+ "timeout, fall back to sync i/o\n");
+ osc_extent_tree_dump(D_CACHE, osc);
+ /* fall back to synchronous I/O */
+ rc = -EDQUOT;
+ break;
+ case -EINTR:
+ /* Ensures restartability - LU-3581 */
+ OSC_DUMP_GRANT(D_CACHE, cli, "interrupted\n");
+ rc = -ERESTARTSYS;
+ break;
+ case -EDQUOT:
+ OSC_DUMP_GRANT(D_CACHE, cli,
+ "no grant space, fall back to sync i/o\n");
+ break;
+ default:
+ CDEBUG(D_CACHE, "%s: event for cache space @ %p never arrived due to %d, fall back to sync i/o\n",
+ cli->cl_import->imp_obd->obd_name, &ocw, rc);
+ break;
+ }
out:
spin_unlock(&cli->cl_loi_list_lock);
- OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc);
return rc;
}
@@ -1670,19 +1691,17 @@ void osc_wake_cache_waiters(struct client_obd *cli)
ocw->ocw_rc = -EDQUOT;
/* we can't dirty more */
- if ((cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) ||
- (atomic_read(&obd_unstable_pages) + 1 +
- atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
- CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %d\n",
- cli->cl_dirty,
- cli->cl_dirty_max, obd_max_dirty_pages);
+ if ((cli->cl_dirty_pages > cli->cl_dirty_max_pages) ||
+ (atomic_long_read(&obd_dirty_pages) + 1 >
+ obd_max_dirty_pages)) {
+ CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %ld\n",
+ cli->cl_dirty_pages, cli->cl_dirty_max_pages,
+ obd_max_dirty_pages);
goto wakeup;
}
- ocw->ocw_rc = 0;
- if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
- ocw->ocw_rc = -EDQUOT;
-
+ if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
+ ocw->ocw_rc = 0;
wakeup:
CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
@@ -1843,97 +1862,6 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
ar->ar_force_sync = 0;
}
-/**
- * Performs "unstable" page accounting. This function balances the
- * increment operations performed in osc_inc_unstable_pages. It is
- * registered as the RPC request callback, and is executed when the
- * bulk RPC is committed on the server. Thus at this point, the pages
- * involved in the bulk transfer are no longer considered unstable.
- */
-void osc_dec_unstable_pages(struct ptlrpc_request *req)
-{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- int page_count = desc->bd_iov_count;
- int i;
-
- /* No unstable page tracking */
- if (!cli->cl_cache)
- return;
-
- LASSERT(page_count >= 0);
-
- for (i = 0; i < page_count; i++)
- dec_node_page_state(desc->bd_iov[i].kiov_page,
- NR_UNSTABLE_NFS);
-
- atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
- LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
-
- atomic_sub(page_count, &cli->cl_unstable_count);
- LASSERT(atomic_read(&cli->cl_unstable_count) >= 0);
-
- atomic_sub(page_count, &obd_unstable_pages);
- LASSERT(atomic_read(&obd_unstable_pages) >= 0);
-
- spin_lock(&req->rq_lock);
- req->rq_committed = 1;
- req->rq_unstable = 0;
- spin_unlock(&req->rq_lock);
-
- wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
-}
-
-/* "unstable" page accounting. See: osc_dec_unstable_pages. */
-void osc_inc_unstable_pages(struct ptlrpc_request *req)
-{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- long page_count = desc->bd_iov_count;
- int i;
-
- /* No unstable page tracking */
- if (!cli->cl_cache)
- return;
-
- LASSERT(page_count >= 0);
-
- for (i = 0; i < page_count; i++)
- inc_node_page_state(desc->bd_iov[i].kiov_page,
- NR_UNSTABLE_NFS);
-
- LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
- atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
-
- LASSERT(atomic_read(&cli->cl_unstable_count) >= 0);
- atomic_add(page_count, &cli->cl_unstable_count);
-
- LASSERT(atomic_read(&obd_unstable_pages) >= 0);
- atomic_add(page_count, &obd_unstable_pages);
-
- spin_lock(&req->rq_lock);
-
- /*
- * If the request has already been committed (i.e. brw_commit
- * called via rq_commit_cb), we need to undo the unstable page
- * increments we just performed because rq_commit_cb wont be
- * called again. Otherwise, just set the commit callback so the
- * unstable page accounting is properly updated when the request
- * is committed
- */
- if (req->rq_committed) {
- /* Drop lock before calling osc_dec_unstable_pages */
- spin_unlock(&req->rq_lock);
- osc_dec_unstable_pages(req);
- spin_lock(&req->rq_lock);
- } else {
- req->rq_unstable = 1;
- req->rq_commit_cb = osc_dec_unstable_pages;
- }
-
- spin_unlock(&req->rq_lock);
-}
-
/* this must be called holding the loi list lock to give coverage to exit_cache,
* async_flag maintenance, and oap_request
*/
@@ -1945,9 +1873,6 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
__u64 xid = 0;
if (oap->oap_request) {
- if (!rc)
- osc_inc_unstable_pages(oap->oap_request);
-
xid = ptlrpc_req_xid(oap->oap_request);
ptlrpc_req_finished(oap->oap_request);
oap->oap_request = NULL;
@@ -1979,7 +1904,7 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
*/
static int try_to_add_extent_for_io(struct client_obd *cli,
struct osc_extent *ext, struct list_head *rpclist,
- int *pc, unsigned int *max_pages)
+ unsigned int *pc, unsigned int *max_pages)
{
struct osc_extent *tmp;
struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
@@ -2032,12 +1957,13 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
* 5. Traverse the extent tree from the 1st extent;
* 6. Above steps exit if there is no space in this RPC.
*/
-static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
+static unsigned int get_write_extents(struct osc_object *obj,
+ struct list_head *rpclist)
{
struct client_obd *cli = osc_cli(obj);
struct osc_extent *ext;
struct osc_extent *temp;
- int page_count = 0;
+ unsigned int page_count = 0;
unsigned int max_pages = cli->cl_max_pages_per_rpc;
LASSERT(osc_object_is_locked(obj));
@@ -2175,7 +2101,7 @@ osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
struct osc_extent *ext;
struct osc_extent *next;
LIST_HEAD(rpclist);
- int page_count = 0;
+ unsigned int page_count = 0;
unsigned int max_pages = cli->cl_max_pages_per_rpc;
int rc = 0;
@@ -2390,7 +2316,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
struct client_obd *cli = oap->oap_cli;
struct osc_object *osc = oap->oap_obj;
pgoff_t index;
- int grants = 0;
+ unsigned int grants = 0, tmp;
int brw_flags = OBD_BRW_ASYNC;
int cmd = OBD_BRW_WRITE;
int need_release = 0;
@@ -2434,9 +2360,6 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
return rc;
}
- if (osc_over_unstable_soft_limit(cli))
- brw_flags |= OBD_BRW_SOFT_SYNC;
-
oap->oap_cmd = cmd;
oap->oap_page_off = ops->ops_from;
oap->oap_count = ops->ops_to - ops->ops_from;
@@ -2476,7 +2399,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
grants = 0;
need_release = 1;
} else if (ext->oe_end < index) {
- int tmp = grants;
+ tmp = grants;
/* try to expand this extent */
rc = osc_extent_expand(ext, index, &tmp);
if (rc < 0) {
@@ -2501,7 +2424,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
}
if (!ext) {
- int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
+ tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
/* try to find new extent to cover this page */
LASSERT(!oio->oi_active);
@@ -2645,7 +2568,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
goto out;
spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
+ oap->oap_async_flags |= ASYNC_READY | ASYNC_URGENT;
spin_unlock(&oap->oap_lock);
if (memory_pressure_get())
diff --git a/drivers/staging/lustre/lustre/osc/osc_cl_internal.h b/drivers/staging/lustre/lustre/osc/osc_cl_internal.h
index c8c3f1ca77be..9c8de15c309c 100644
--- a/drivers/staging/lustre/lustre/osc/osc_cl_internal.h
+++ b/drivers/staging/lustre/lustre/osc/osc_cl_internal.h
@@ -64,7 +64,7 @@ struct osc_io {
/** true if this io is lockless. */
unsigned int oi_lockless;
/** how many LRU pages are reserved for this IO */
- int oi_lru_reserved;
+ unsigned long oi_lru_reserved;
/** active extents, we know how many bytes is going to be written,
* so having an active extent will prevent it from being fragmented
@@ -389,7 +389,7 @@ extern struct lu_device_type osc_device_type;
extern struct lu_context_key osc_key;
extern struct lu_context_key osc_session_key;
-#define OSC_FLAGS (ASYNC_URGENT|ASYNC_READY)
+#define OSC_FLAGS (ASYNC_URGENT | ASYNC_READY)
int osc_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
@@ -608,7 +608,7 @@ struct osc_extent {
/** link list of osc_object's oo_{hp|urgent|locking}_exts. */
struct list_head oe_link;
/** state of this extent */
- unsigned int oe_state;
+ enum osc_extent_state oe_state;
/** flags for this extent. */
unsigned int oe_intree:1,
/** 0 is write, 1 is read */
diff --git a/drivers/staging/lustre/lustre/osc/osc_internal.h b/drivers/staging/lustre/lustre/osc/osc_internal.h
index 7a27f0961955..67fe0a254991 100644
--- a/drivers/staging/lustre/lustre/osc/osc_internal.h
+++ b/drivers/staging/lustre/lustre/osc/osc_internal.h
@@ -71,7 +71,6 @@ struct osc_async_page {
struct client_obd *oap_cli;
struct osc_object *oap_obj;
- struct ldlm_lock *oap_ldlm_lock;
spinlock_t oap_lock;
};
@@ -134,9 +133,9 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
struct list_head *ext_list, int cmd);
-int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
- int target, bool force);
-int osc_lru_reclaim(struct client_obd *cli);
+long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
+ long target, bool force);
+long osc_lru_reclaim(struct client_obd *cli);
unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock);
@@ -198,7 +197,7 @@ int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk);
void osc_inc_unstable_pages(struct ptlrpc_request *req);
void osc_dec_unstable_pages(struct ptlrpc_request *req);
-int osc_over_unstable_soft_limit(struct client_obd *cli);
+bool osc_over_unstable_soft_limit(struct client_obd *cli);
struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
struct osc_object *obj, pgoff_t index,
diff --git a/drivers/staging/lustre/lustre/osc/osc_io.c b/drivers/staging/lustre/lustre/osc/osc_io.c
index 6e3dcd38913f..8a559cbcdd0c 100644
--- a/drivers/staging/lustre/lustre/osc/osc_io.c
+++ b/drivers/staging/lustre/lustre/osc/osc_io.c
@@ -109,11 +109,11 @@ static int osc_io_submit(const struct lu_env *env,
struct cl_page_list *qin = &queue->c2_qin;
struct cl_page_list *qout = &queue->c2_qout;
- int queued = 0;
+ unsigned int queued = 0;
int result = 0;
int cmd;
int brw_flags;
- int max_pages;
+ unsigned int max_pages;
LASSERT(qin->pl_nr > 0);
@@ -163,14 +163,19 @@ static int osc_io_submit(const struct lu_env *env,
continue;
}
- cl_page_list_move(qout, qin, page);
spin_lock(&oap->oap_lock);
- oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
+ oap->oap_async_flags = ASYNC_URGENT | ASYNC_READY;
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
spin_unlock(&oap->oap_lock);
osc_page_submit(env, opg, crt, brw_flags);
list_add_tail(&oap->oap_pending_item, &list);
+
+ if (page->cp_sync_io)
+ cl_page_list_move(qout, qin, page);
+ else /* async IO */
+ cl_page_list_del(env, qin, page);
+
if (++queued == max_pages) {
queued = 0;
result = osc_queue_sync_pages(env, osc, &list, cmd,
@@ -195,7 +200,7 @@ static int osc_io_submit(const struct lu_env *env,
* Expand stripe KMS if necessary.
*/
static void osc_page_touch_at(const struct lu_env *env,
- struct cl_object *obj, pgoff_t idx, unsigned to)
+ struct cl_object *obj, pgoff_t idx, size_t to)
{
struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
@@ -228,7 +233,7 @@ static void osc_page_touch_at(const struct lu_env *env,
attr->cat_size = kms;
valid |= CAT_SIZE;
}
- cl_object_attr_set(env, obj, attr, valid);
+ cl_object_attr_update(env, obj, attr, valid);
cl_object_attr_unlock(obj);
}
@@ -314,8 +319,8 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
struct osc_object *osc = cl2osc(ios->cis_obj);
struct client_obd *cli = osc_cli(osc);
unsigned long c;
- unsigned int npages;
- unsigned int max_pages;
+ unsigned long npages;
+ unsigned long max_pages;
if (cl_io_is_append(io))
return 0;
@@ -328,15 +333,15 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
if (npages > max_pages)
npages = max_pages;
- c = atomic_read(cli->cl_lru_left);
+ c = atomic_long_read(cli->cl_lru_left);
if (c < npages && osc_lru_reclaim(cli) > 0)
- c = atomic_read(cli->cl_lru_left);
+ c = atomic_long_read(cli->cl_lru_left);
while (c >= npages) {
- if (c == atomic_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+ if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
oio->oi_lru_reserved = npages;
break;
}
- c = atomic_read(cli->cl_lru_left);
+ c = atomic_long_read(cli->cl_lru_left);
}
return 0;
@@ -350,7 +355,7 @@ static void osc_io_rw_iter_fini(const struct lu_env *env,
struct client_obd *cli = osc_cli(osc);
if (oio->oi_lru_reserved > 0) {
- atomic_add(oio->oi_lru_reserved, cli->cl_lru_left);
+ atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left);
oio->oi_lru_reserved = 0;
}
oio->oi_write_osclock = NULL;
@@ -364,7 +369,7 @@ static int osc_io_fault_start(const struct lu_env *env,
io = ios->cis_io;
fio = &io->u.ci_fault;
- CDEBUG(D_INFO, "%lu %d %d\n",
+ CDEBUG(D_INFO, "%lu %d %zu\n",
fio->ft_index, fio->ft_writable, fio->ft_nob);
/*
* If mapping is writeable, adjust kms to cover this page,
@@ -471,18 +476,21 @@ static int osc_io_setattr_start(const struct lu_env *env,
attr->cat_ctime = lvb->lvb_ctime;
cl_valid |= CAT_CTIME;
}
- result = cl_object_attr_set(env, obj, attr, cl_valid);
+ result = cl_object_attr_update(env, obj, attr,
+ cl_valid);
}
cl_object_attr_unlock(obj);
}
memset(oa, 0, sizeof(*oa));
if (result == 0) {
oa->o_oi = loi->loi_oi;
+ obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid);
+ oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index;
oa->o_mtime = attr->cat_mtime;
oa->o_atime = attr->cat_atime;
oa->o_ctime = attr->cat_ctime;
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
- OBD_MD_FLCTIME | OBD_MD_FLMTIME;
+ oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
+ OBD_MD_FLCTIME | OBD_MD_FLMTIME;
if (ia_valid & ATTR_SIZE) {
oa->o_size = size;
oa->o_blocks = OBD_OBJECT_EOF;
@@ -559,7 +567,7 @@ static int osc_io_read_start(const struct lu_env *env,
if (!slice->cis_io->ci_noatime) {
cl_object_attr_lock(obj);
attr->cat_atime = ktime_get_real_seconds();
- rc = cl_object_attr_set(env, obj, attr, CAT_ATIME);
+ rc = cl_object_attr_update(env, obj, attr, CAT_ATIME);
cl_object_attr_unlock(obj);
}
return rc;
@@ -576,7 +584,7 @@ static int osc_io_write_start(const struct lu_env *env,
cl_object_attr_lock(obj);
attr->cat_ctime = ktime_get_real_seconds();
attr->cat_mtime = attr->cat_ctime;
- rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME);
+ rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
cl_object_attr_unlock(obj);
return rc;
diff --git a/drivers/staging/lustre/lustre/osc/osc_lock.c b/drivers/staging/lustre/lustre/osc/osc_lock.c
index 717d3ffb6789..39a8a5851603 100644
--- a/drivers/staging/lustre/lustre/osc/osc_lock.c
+++ b/drivers/staging/lustre/lustre/osc/osc_lock.c
@@ -222,7 +222,7 @@ static void osc_lock_lvb_update(const struct lu_env *env,
ldlm_lock_allow_match_locked(dlmlock);
}
- cl_object_attr_set(env, obj, attr, valid);
+ cl_object_attr_update(env, obj, attr, valid);
cl_object_attr_unlock(obj);
}
@@ -467,7 +467,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
*/
attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
- cl_object_attr_set(env, obj, attr, CAT_KMS);
+ cl_object_attr_update(env, obj, attr, CAT_KMS);
cl_object_attr_unlock(obj);
unlock_res_and_lock(dlmlock);
diff --git a/drivers/staging/lustre/lustre/osc/osc_object.c b/drivers/staging/lustre/lustre/osc/osc_object.c
index d211d1905e83..aae3a2d4243f 100644
--- a/drivers/staging/lustre/lustre/osc/osc_object.c
+++ b/drivers/staging/lustre/lustre/osc/osc_object.c
@@ -159,8 +159,8 @@ static int osc_attr_get(const struct lu_env *env, struct cl_object *obj,
return 0;
}
-static int osc_attr_set(const struct lu_env *env, struct cl_object *obj,
- const struct cl_attr *attr, unsigned valid)
+static int osc_attr_update(const struct lu_env *env, struct cl_object *obj,
+ const struct cl_attr *attr, unsigned int valid)
{
struct lov_oinfo *oinfo = cl2osc(obj)->oo_oinfo;
struct ost_lvb *lvb = &oinfo->loi_lvb;
@@ -195,7 +195,6 @@ static int osc_object_glimpse(const struct lu_env *env,
static int osc_object_ast_clear(struct ldlm_lock *lock, void *data)
{
- LASSERT(lock->l_granted_mode == lock->l_req_mode);
if (lock->l_ast_data == data)
lock->l_ast_data = NULL;
return LDLM_ITER_CONTINUE;
@@ -262,7 +261,7 @@ static const struct cl_object_operations osc_ops = {
.coo_lock_init = osc_lock_init,
.coo_io_init = osc_io_init,
.coo_attr_get = osc_attr_get,
- .coo_attr_set = osc_attr_set,
+ .coo_attr_update = osc_attr_update,
.coo_glimpse = osc_object_glimpse,
.coo_prune = osc_object_prune
};
diff --git a/drivers/staging/lustre/lustre/osc/osc_page.c b/drivers/staging/lustre/lustre/osc/osc_page.c
index 355f496a2093..2a7a70aa9e80 100644
--- a/drivers/staging/lustre/lustre/osc/osc_page.c
+++ b/drivers/staging/lustre/lustre/osc/osc_page.c
@@ -323,32 +323,6 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj,
return result;
}
-int osc_over_unstable_soft_limit(struct client_obd *cli)
-{
- long obd_upages, obd_dpages, osc_upages;
-
- /* Can't check cli->cl_unstable_count, therefore, no soft limit */
- if (!cli)
- return 0;
-
- obd_upages = atomic_read(&obd_unstable_pages);
- obd_dpages = atomic_read(&obd_dirty_pages);
-
- osc_upages = atomic_read(&cli->cl_unstable_count);
-
- /*
- * obd_max_dirty_pages is the max number of (dirty + unstable)
- * pages allowed at any given time. To simulate an unstable page
- * only limit, we subtract the current number of dirty pages
- * from this max. This difference is roughly the amount of pages
- * currently available for unstable pages. Thus, the soft limit
- * is half of that difference. Check osc_upages to ensure we don't
- * set SOFT_SYNC for OSCs without any outstanding unstable pages.
- */
- return osc_upages &&
- obd_upages >= (obd_max_dirty_pages - obd_dpages) / 2;
-}
-
/**
* Helper function called by osc_io_submit() for every page in an immediate
* transfer (i.e., transferred synchronously).
@@ -368,9 +342,6 @@ void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
oap->oap_count = opg->ops_to - opg->ops_from;
oap->oap_brw_flags = brw_flags | OBD_BRW_SYNC;
- if (osc_over_unstable_soft_limit(oap->oap_cli))
- oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
-
if (capable(CFS_CAP_SYS_RESOURCE)) {
oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
oap->oap_cmd |= OBD_BRW_NOQUOTA;
@@ -409,7 +380,7 @@ static const int lru_shrink_max = 8 << (20 - PAGE_SHIFT); /* 8M */
static int osc_cache_too_much(struct client_obd *cli)
{
struct cl_client_cache *cache = cli->cl_cache;
- int pages = atomic_read(&cli->cl_lru_in_list);
+ long pages = atomic_long_read(&cli->cl_lru_in_list);
unsigned long budget;
budget = cache->ccc_lru_max / (atomic_read(&cache->ccc_users) - 2);
@@ -417,7 +388,7 @@ static int osc_cache_too_much(struct client_obd *cli)
/* if it's going to run out LRU slots, we should free some, but not
* too much to maintain fairness among OSCs.
*/
- if (atomic_read(cli->cl_lru_left) < cache->ccc_lru_max >> 4) {
+ if (atomic_long_read(cli->cl_lru_left) < cache->ccc_lru_max >> 4) {
if (pages >= budget)
return lru_shrink_max;
else if (pages >= budget / 2)
@@ -444,7 +415,7 @@ void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist)
{
LIST_HEAD(lru);
struct osc_async_page *oap;
- int npages = 0;
+ long npages = 0;
list_for_each_entry(oap, plist, oap_pending_item) {
struct osc_page *opg = oap2osc_page(oap);
@@ -460,8 +431,8 @@ void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist)
if (npages > 0) {
spin_lock(&cli->cl_lru_list_lock);
list_splice_tail(&lru, &cli->cl_lru_list);
- atomic_sub(npages, &cli->cl_lru_busy);
- atomic_add(npages, &cli->cl_lru_in_list);
+ atomic_long_sub(npages, &cli->cl_lru_busy);
+ atomic_long_add(npages, &cli->cl_lru_in_list);
spin_unlock(&cli->cl_lru_list_lock);
/* XXX: May set force to be true for better performance */
@@ -472,9 +443,9 @@ void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist)
static void __osc_lru_del(struct client_obd *cli, struct osc_page *opg)
{
- LASSERT(atomic_read(&cli->cl_lru_in_list) > 0);
+ LASSERT(atomic_long_read(&cli->cl_lru_in_list) > 0);
list_del_init(&opg->ops_lru);
- atomic_dec(&cli->cl_lru_in_list);
+ atomic_long_dec(&cli->cl_lru_in_list);
}
/**
@@ -488,12 +459,12 @@ static void osc_lru_del(struct client_obd *cli, struct osc_page *opg)
if (!list_empty(&opg->ops_lru)) {
__osc_lru_del(cli, opg);
} else {
- LASSERT(atomic_read(&cli->cl_lru_busy) > 0);
- atomic_dec(&cli->cl_lru_busy);
+ LASSERT(atomic_long_read(&cli->cl_lru_busy) > 0);
+ atomic_long_dec(&cli->cl_lru_busy);
}
spin_unlock(&cli->cl_lru_list_lock);
- atomic_inc(cli->cl_lru_left);
+ atomic_long_inc(cli->cl_lru_left);
/* this is a great place to release more LRU pages if
* this osc occupies too many LRU pages and kernel is
* stealing one of them.
@@ -518,7 +489,7 @@ static void osc_lru_use(struct client_obd *cli, struct osc_page *opg)
spin_lock(&cli->cl_lru_list_lock);
__osc_lru_del(cli, opg);
spin_unlock(&cli->cl_lru_list_lock);
- atomic_inc(&cli->cl_lru_busy);
+ atomic_long_inc(&cli->cl_lru_busy);
}
}
@@ -540,10 +511,32 @@ static void discard_pagevec(const struct lu_env *env, struct cl_io *io,
}
/**
+ * Check if a cl_page can be released, i.e, it's not being used.
+ *
+ * If unstable account is turned on, bulk transfer may hold one refcount
+ * for recovery so we need to check vmpage refcount as well; otherwise,
+ * even we can destroy cl_page but the corresponding vmpage can't be reused.
+ */
+static inline bool lru_page_busy(struct client_obd *cli, struct cl_page *page)
+{
+ if (cl_page_in_use_noref(page))
+ return true;
+
+ if (cli->cl_cache->ccc_unstable_check) {
+ struct page *vmpage = cl_page_vmpage(page);
+
+ /* vmpage have two known users: cl_page and VM page cache */
+ if (page_count(vmpage) - page_mapcount(vmpage) > 2)
+ return true;
+ }
+ return false;
+}
+
+/**
* Drop @target of pages from LRU at most.
*/
-int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
- int target, bool force)
+long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
+ long target, bool force)
{
struct cl_io *io;
struct cl_object *clobj = NULL;
@@ -551,12 +544,12 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
struct osc_page *opg;
struct osc_page *temp;
int maxscan = 0;
- int count = 0;
+ long count = 0;
int index = 0;
int rc = 0;
- LASSERT(atomic_read(&cli->cl_lru_in_list) >= 0);
- if (atomic_read(&cli->cl_lru_in_list) == 0 || target <= 0)
+ LASSERT(atomic_long_read(&cli->cl_lru_in_list) >= 0);
+ if (atomic_long_read(&cli->cl_lru_in_list) == 0 || target <= 0)
return 0;
if (!force) {
@@ -575,7 +568,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
io = &osc_env_info(env)->oti_io;
spin_lock(&cli->cl_lru_list_lock);
- maxscan = min(target << 1, atomic_read(&cli->cl_lru_in_list));
+ maxscan = min(target << 1, atomic_long_read(&cli->cl_lru_in_list));
list_for_each_entry_safe(opg, temp, &cli->cl_lru_list, ops_lru) {
struct cl_page *page;
bool will_free = false;
@@ -584,7 +577,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
break;
page = opg->ops_cl.cpl_page;
- if (cl_page_in_use_noref(page)) {
+ if (lru_page_busy(cli, page)) {
list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
continue;
}
@@ -620,7 +613,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
}
if (cl_page_own_try(env, io, page) == 0) {
- if (!cl_page_in_use_noref(page)) {
+ if (!lru_page_busy(cli, page)) {
/* remove it from lru list earlier to avoid
* lock contention
*/
@@ -663,24 +656,19 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
atomic_dec(&cli->cl_lru_shrinkers);
if (count > 0) {
- atomic_add(count, cli->cl_lru_left);
+ atomic_long_add(count, cli->cl_lru_left);
wake_up_all(&osc_lru_waitq);
}
return count > 0 ? count : rc;
}
-static inline int max_to_shrink(struct client_obd *cli)
-{
- return min(atomic_read(&cli->cl_lru_in_list) >> 1, lru_shrink_max);
-}
-
-int osc_lru_reclaim(struct client_obd *cli)
+long osc_lru_reclaim(struct client_obd *cli)
{
struct cl_env_nest nest;
struct lu_env *env;
struct cl_client_cache *cache = cli->cl_cache;
int max_scans;
- int rc = 0;
+ long rc = 0;
LASSERT(cache);
@@ -693,15 +681,15 @@ int osc_lru_reclaim(struct client_obd *cli)
if (rc == -EBUSY)
rc = 0;
- CDEBUG(D_CACHE, "%s: Free %d pages from own LRU: %p.\n",
+ CDEBUG(D_CACHE, "%s: Free %ld pages from own LRU: %p.\n",
cli->cl_import->imp_obd->obd_name, rc, cli);
goto out;
}
- CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %d, busy: %d.\n",
+ CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %ld, busy: %ld.\n",
cli->cl_import->imp_obd->obd_name, cli,
- atomic_read(&cli->cl_lru_in_list),
- atomic_read(&cli->cl_lru_busy));
+ atomic_long_read(&cli->cl_lru_in_list),
+ atomic_long_read(&cli->cl_lru_busy));
/* Reclaim LRU slots from other client_obd as it can't free enough
* from its own. This should rarely happen.
@@ -717,10 +705,10 @@ int osc_lru_reclaim(struct client_obd *cli)
cli = list_entry(cache->ccc_lru.next, struct client_obd,
cl_lru_osc);
- CDEBUG(D_CACHE, "%s: cli %p LRU pages: %d, busy: %d.\n",
+ CDEBUG(D_CACHE, "%s: cli %p LRU pages: %ld, busy: %ld.\n",
cli->cl_import->imp_obd->obd_name, cli,
- atomic_read(&cli->cl_lru_in_list),
- atomic_read(&cli->cl_lru_busy));
+ atomic_long_read(&cli->cl_lru_in_list),
+ atomic_long_read(&cli->cl_lru_busy));
list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
if (osc_cache_too_much(cli) > 0) {
@@ -737,11 +725,18 @@ int osc_lru_reclaim(struct client_obd *cli)
out:
cl_env_nested_put(&nest, env);
- CDEBUG(D_CACHE, "%s: cli %p freed %d pages.\n",
+ CDEBUG(D_CACHE, "%s: cli %p freed %ld pages.\n",
cli->cl_import->imp_obd->obd_name, cli, rc);
return rc;
}
+/**
+ * osc_lru_reserve() is called to reserve an LRU slot for a cl_page.
+ *
+ * Usually the LRU slots are reserved in osc_io_iter_rw_init().
+ * Only in the case that the LRU slots are in extreme shortage, it should
+ * have reserved enough slots for an IO.
+ */
static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
struct osc_page *opg)
{
@@ -758,8 +753,8 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
goto out;
}
- LASSERT(atomic_read(cli->cl_lru_left) >= 0);
- while (!atomic_add_unless(cli->cl_lru_left, -1, 0)) {
+ LASSERT(atomic_long_read(cli->cl_lru_left) >= 0);
+ while (!atomic_long_add_unless(cli->cl_lru_left, -1, 0)) {
/* run out of LRU spaces, try to drop some by itself */
rc = osc_lru_reclaim(cli);
if (rc < 0)
@@ -770,7 +765,7 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
cond_resched();
rc = l_wait_event(osc_lru_waitq,
- atomic_read(cli->cl_lru_left) > 0,
+ atomic_long_read(cli->cl_lru_left) > 0,
&lwi);
if (rc < 0)
@@ -779,7 +774,7 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
out:
if (rc >= 0) {
- atomic_inc(&cli->cl_lru_busy);
+ atomic_long_inc(&cli->cl_lru_busy);
opg->ops_in_lru = 1;
rc = 0;
}
@@ -787,4 +782,151 @@ out:
return rc;
}
+/**
+ * Atomic operations are expensive. We accumulate the accounting for the
+ * same page pgdat to get better performance.
+ * In practice this can work pretty good because the pages in the same RPC
+ * are likely from the same page zone.
+ */
+static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ int factor)
+{
+ int page_count = desc->bd_iov_count;
+ pg_data_t *last = NULL;
+ int count = 0;
+ int i;
+
+ for (i = 0; i < page_count; i++) {
+ pg_data_t *pgdat = page_pgdat(desc->bd_iov[i].bv_page);
+
+ if (likely(pgdat == last)) {
+ ++count;
+ continue;
+ }
+
+ if (count > 0) {
+ mod_node_page_state(pgdat, NR_UNSTABLE_NFS,
+ factor * count);
+ count = 0;
+ }
+ last = pgdat;
+ ++count;
+ }
+ if (count > 0)
+ mod_node_page_state(last, NR_UNSTABLE_NFS, factor * count);
+}
+
+static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+{
+ unstable_page_accounting(desc, 1);
+}
+
+static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+{
+ unstable_page_accounting(desc, -1);
+}
+
+/**
+ * Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable.
+ *
+ * If this function is called, the request should have been committed
+ * or req:rq_unstable must have been set; it implies that the unstable
+ * statistic have been added.
+ */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ int page_count = desc->bd_iov_count;
+ long unstable_count;
+
+ LASSERT(page_count >= 0);
+ dec_unstable_page_accounting(desc);
+
+ unstable_count = atomic_long_sub_return(page_count,
+ &cli->cl_unstable_count);
+ LASSERT(unstable_count >= 0);
+
+ unstable_count = atomic_long_sub_return(page_count,
+ &cli->cl_cache->ccc_unstable_nr);
+ LASSERT(unstable_count >= 0);
+ if (!unstable_count)
+ wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
+
+ if (osc_cache_too_much(cli))
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
+}
+
+/**
+ * "unstable" page accounting. See: osc_dec_unstable_pages.
+ */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ long page_count = desc->bd_iov_count;
+
+ /* No unstable page tracking */
+ if (!cli->cl_cache || !cli->cl_cache->ccc_unstable_check)
+ return;
+
+ add_unstable_page_accounting(desc);
+ atomic_long_add(page_count, &cli->cl_unstable_count);
+ atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+ /*
+ * If the request has already been committed (i.e. brw_commit
+ * called via rq_commit_cb), we need to undo the unstable page
+ * increments we just performed because rq_commit_cb wont be
+ * called again.
+ */
+ spin_lock(&req->rq_lock);
+ if (unlikely(req->rq_committed)) {
+ spin_unlock(&req->rq_lock);
+
+ osc_dec_unstable_pages(req);
+ } else {
+ req->rq_unstable = 1;
+ spin_unlock(&req->rq_lock);
+ }
+}
+
+/**
+ * Check if it piggybacks SOFT_SYNC flag to OST from this OSC.
+ * This function will be called by every BRW RPC so it's critical
+ * to make this function fast.
+ */
+bool osc_over_unstable_soft_limit(struct client_obd *cli)
+{
+ long unstable_nr, osc_unstable_count;
+
+ /* Can't check cli->cl_unstable_count, therefore, no soft limit */
+ if (!cli->cl_cache || !cli->cl_cache->ccc_unstable_check)
+ return false;
+
+ osc_unstable_count = atomic_long_read(&cli->cl_unstable_count);
+ unstable_nr = atomic_long_read(&cli->cl_cache->ccc_unstable_nr);
+
+ CDEBUG(D_CACHE,
+ "%s: cli: %p unstable pages: %lu, osc unstable pages: %lu\n",
+ cli->cl_import->imp_obd->obd_name, cli,
+ unstable_nr, osc_unstable_count);
+
+ /*
+ * If the LRU slots are in shortage - 25% remaining AND this OSC
+ * has one full RPC window of unstable pages, it's a good chance
+ * to piggyback a SOFT_SYNC flag.
+ * Please notice that the OST won't take immediate response for the
+ * SOFT_SYNC request so active OSCs will have more chance to carry
+ * the flag, this is reasonable.
+ */
+ return unstable_nr > cli->cl_cache->ccc_lru_max >> 2 &&
+ osc_unstable_count > cli->cl_max_pages_per_rpc *
+ cli->cl_max_rpcs_in_flight;
+}
+
/** @} osc */
diff --git a/drivers/staging/lustre/lustre/osc/osc_request.c b/drivers/staging/lustre/lustre/osc/osc_request.c
index 536b868ff776..749781f022e2 100644
--- a/drivers/staging/lustre/lustre/osc/osc_request.c
+++ b/drivers/staging/lustre/lustre/osc/osc_request.c
@@ -41,6 +41,7 @@
#include "../include/lustre_ha.h"
#include "../include/lprocfs_status.h"
+#include "../include/lustre/lustre_ioctl.h"
#include "../include/lustre_debug.h"
#include "../include/lustre_param.h"
#include "../include/lustre_fid.h"
@@ -102,36 +103,6 @@ static void osc_release_ppga(struct brw_page **ppga, u32 count);
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc);
-/* Pack OSC object metadata for disk storage (LE byte order). */
-static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
- struct lov_stripe_md *lsm)
-{
- int lmm_size;
-
- lmm_size = sizeof(**lmmp);
- if (!lmmp)
- return lmm_size;
-
- if (*lmmp && !lsm) {
- kfree(*lmmp);
- *lmmp = NULL;
- return 0;
- } else if (unlikely(lsm && ostid_id(&lsm->lsm_oi) == 0)) {
- return -EBADF;
- }
-
- if (!*lmmp) {
- *lmmp = kzalloc(lmm_size, GFP_NOFS);
- if (!*lmmp)
- return -ENOMEM;
- }
-
- if (lsm)
- ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
-
- return lmm_size;
-}
-
/* Unpack OSC object metadata from disk storage (LE byte order). */
static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
struct lov_mds_md *lmm, int lmm_bytes)
@@ -189,7 +160,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
(imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
(*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
else
- (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+ (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
return lsm_size;
}
@@ -427,24 +398,16 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
oinfo->oi_cb_up, oinfo, rqset);
}
-static int osc_real_create(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md **ea,
- struct obd_trans_info *oti)
+static int osc_create(const struct lu_env *env, struct obd_export *exp,
+ struct obdo *oa, struct obd_trans_info *oti)
{
struct ptlrpc_request *req;
struct ost_body *body;
- struct lov_stripe_md *lsm;
int rc;
LASSERT(oa);
- LASSERT(ea);
-
- lsm = *ea;
- if (!lsm) {
- rc = obd_alloc_memmd(exp, &lsm);
- if (rc < 0)
- return rc;
- }
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+ LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
if (!req) {
@@ -490,21 +453,10 @@ static int osc_real_create(struct obd_export *exp, struct obdo *oa,
oa->o_blksize = cli_brw_size(exp->exp_obd);
oa->o_valid |= OBD_MD_FLBLKSZ;
- /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
- * have valid lsm_oinfo data structs, so don't go touching that.
- * This needs to be fixed in a big way.
- */
- lsm->lsm_oi = oa->o_oi;
- *ea = lsm;
-
- if (oti) {
- oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
-
- if (oa->o_valid & OBD_MD_FLCOOKIE) {
- if (!oti->oti_logcookies)
- oti_alloc_cookies(oti, 1);
- *oti->oti_logcookies = oa->o_lcookie;
- }
+ if (oti && oa->o_valid & OBD_MD_FLCOOKIE) {
+ if (!oti->oti_logcookies)
+ oti->oti_logcookies = &oti->oti_onecookie;
+ *oti->oti_logcookies = oa->o_lcookie;
}
CDEBUG(D_HA, "transno: %lld\n",
@@ -512,8 +464,6 @@ static int osc_real_create(struct obd_export *exp, struct obdo *oa,
out_req:
ptlrpc_req_finished(req);
out:
- if (rc && !*ea)
- obd_free_memmd(exp, &lsm);
return rc;
}
@@ -649,7 +599,7 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
ostid_build_res_name(&oa->o_oi, &res_id);
res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
- if (!res)
+ if (IS_ERR(res))
return 0;
LDLM_RESOURCE_ADDREF(res);
@@ -689,30 +639,6 @@ static int osc_can_send_destroy(struct client_obd *cli)
return 0;
}
-static int osc_create(const struct lu_env *env, struct obd_export *exp,
- struct obdo *oa, struct lov_stripe_md **ea,
- struct obd_trans_info *oti)
-{
- int rc = 0;
-
- LASSERT(oa);
- LASSERT(ea);
- LASSERT(oa->o_valid & OBD_MD_FLGROUP);
-
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
- oa->o_flags == OBD_FL_RECREATE_OBJS) {
- return osc_real_create(exp, oa, ea, oti);
- }
-
- if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
- return osc_real_create(exp, oa, ea, oti);
-
- /* we should not get here anymore */
- LBUG();
-
- return rc;
-}
-
/* Destroy requests can be async always on the client, and we don't even really
* care about the return code since the client cannot do anything at all about
* a destroy failure.
@@ -725,8 +651,7 @@ static int osc_create(const struct lu_env *env, struct obd_export *exp,
* cookies to the MDS after committing destroy transactions.
*/
static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
- struct obdo *oa, struct lov_stripe_md *ea,
- struct obd_trans_info *oti, struct obd_export *md_export)
+ struct obdo *oa, struct obd_trans_info *oti)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
struct ptlrpc_request *req;
@@ -794,42 +719,44 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
long writing_bytes)
{
- u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
+ u32 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
LASSERT(!(oa->o_valid & bits));
oa->o_valid |= bits;
spin_lock(&cli->cl_loi_list_lock);
- oa->o_dirty = cli->cl_dirty;
- if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
- cli->cl_dirty_max)) {
+ oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
+ if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
+ cli->cl_dirty_max_pages)) {
CERROR("dirty %lu - %lu > dirty_max %lu\n",
- cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
+ cli->cl_dirty_pages, cli->cl_dirty_transit,
+ cli->cl_dirty_max_pages);
oa->o_undirty = 0;
- } else if (unlikely(atomic_read(&obd_unstable_pages) +
- atomic_read(&obd_dirty_pages) -
- atomic_read(&obd_dirty_transit_pages) >
- (long)(obd_max_dirty_pages + 1))) {
+ } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
+ atomic_long_read(&obd_dirty_transit_pages) >
+ (obd_max_dirty_pages + 1))) {
/* The atomic_read() allowing the atomic_inc() are
* not covered by a lock thus they may safely race and trip
* this CERROR() unless we add in a small fudge factor (+1).
*/
- CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
+ CERROR("%s: dirty %ld + %ld > system dirty_max %lu\n",
cli->cl_import->imp_obd->obd_name,
- atomic_read(&obd_unstable_pages),
- atomic_read(&obd_dirty_pages),
- atomic_read(&obd_dirty_transit_pages),
+ atomic_long_read(&obd_dirty_pages),
+ atomic_long_read(&obd_dirty_transit_pages),
obd_max_dirty_pages);
oa->o_undirty = 0;
- } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
+ } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
+ 0x7fffffff)) {
CERROR("dirty %lu - dirty_max %lu too big???\n",
- cli->cl_dirty, cli->cl_dirty_max);
+ cli->cl_dirty_pages, cli->cl_dirty_max_pages);
oa->o_undirty = 0;
} else {
- long max_in_flight = (cli->cl_max_pages_per_rpc <<
- PAGE_SHIFT)*
- (cli->cl_max_rpcs_in_flight + 1);
- oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
+ unsigned long max_in_flight;
+
+ max_in_flight = (cli->cl_max_pages_per_rpc << PAGE_SHIFT) *
+ (cli->cl_max_rpcs_in_flight + 1);
+ oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_SHIFT,
+ max_in_flight);
}
oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
oa->o_dropped = cli->cl_lost_grant;
@@ -1029,22 +956,24 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
{
/*
* ocd_grant is the total grant amount we're expect to hold: if we've
- * been evicted, it's the new avail_grant amount, cl_dirty will drop
- * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
+ * been evicted, it's the new avail_grant amount, cl_dirty_pages will
+ * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
+ * dirty.
*
* race is tolerable here: if we're evicted, but imp_state already
- * left EVICTED state, then cl_dirty must be 0 already.
+ * left EVICTED state, then cl_dirty_pages must be 0 already.
*/
spin_lock(&cli->cl_loi_list_lock);
if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
cli->cl_avail_grant = ocd->ocd_grant;
else
- cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
+ cli->cl_avail_grant = ocd->ocd_grant -
+ (cli->cl_dirty_pages << PAGE_SHIFT);
if (cli->cl_avail_grant < 0) {
CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
- ocd->ocd_grant, cli->cl_dirty);
+ ocd->ocd_grant, cli->cl_dirty_pages << PAGE_SHIFT);
/* workaround for servers which do not have the patch from
* LU-2679
*/
@@ -1181,7 +1110,7 @@ static u32 osc_checksum_bulk(int nob, u32 pg_count,
}
while (nob > 0 && pg_count > 0) {
- int count = pga[i]->count > nob ? nob : pga[i]->count;
+ unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
/* corrupt the data before we compute the checksum, to
* simulate an OST->client data error
@@ -1191,7 +1120,7 @@ static u32 osc_checksum_bulk(int nob, u32 pg_count,
unsigned char *ptr = kmap(pga[i]->pg);
int off = pga[i]->off & ~PAGE_MASK;
- memcpy(ptr + off, "bad1", min(4, nob));
+ memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
kunmap(pga[i]->pg);
}
cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
@@ -1335,11 +1264,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
if (i > 0 && can_merge_pages(pg_prev, pg)) {
niobuf--;
- niobuf->len += pg->count;
+ niobuf->rnb_len += pg->count;
} else {
- niobuf->offset = pg->off;
- niobuf->len = pg->count;
- niobuf->flags = pg->flag;
+ niobuf->rnb_offset = pg->off;
+ niobuf->rnb_len = pg->count;
+ niobuf->rnb_flags = pg->flag;
}
pg_prev = pg;
}
@@ -1418,6 +1347,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
INIT_LIST_HEAD(&aa->aa_oaps);
*reqp = req;
+ niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
+ CDEBUG(D_RPCTRACE, "brw rpc %p - object " DOSTID " offset %lld<>%lld\n",
+ req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
+ niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
+
return 0;
out:
@@ -1463,7 +1397,8 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
POSTID(&oa->o_oi), pga[0]->off,
- pga[page_count-1]->off + pga[page_count-1]->count - 1);
+ pga[page_count - 1]->off +
+ pga[page_count - 1]->count - 1);
CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
client_cksum, client_cksum_type,
server_cksum, cksum_type, new_cksum);
@@ -1565,7 +1500,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
char *router = "";
enum cksum_type cksum_type;
- cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
+ cksum_type = cksum_type_unpack(body->oa.o_valid &
+ OBD_MD_FLFLAGS ?
body->oa.o_flags : 0);
client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
aa->aa_ppga, OST_READ,
@@ -1794,7 +1730,8 @@ static int brw_interpret(const struct lu_env *env,
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
- loff_t last_off = last->oap_count + last->oap_obj_off;
+ loff_t last_off = last->oap_count + last->oap_obj_off +
+ last->oap_page_off;
/* Change file size if this is an out of quota or
* direct IO write and it extends the file size
@@ -1812,11 +1749,14 @@ static int brw_interpret(const struct lu_env *env,
}
if (valid != 0)
- cl_object_attr_set(env, obj, attr, valid);
+ cl_object_attr_update(env, obj, attr, valid);
cl_object_attr_unlock(obj);
}
kmem_cache_free(obdo_cachep, aa->aa_oa);
+ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
+ osc_inc_unstable_pages(req);
+
list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
list_del_init(&ext->oe_link);
osc_extent_finish(env, ext, 1, rc);
@@ -1847,21 +1787,21 @@ static int brw_interpret(const struct lu_env *env,
static void brw_commit(struct ptlrpc_request *req)
{
- spin_lock(&req->rq_lock);
/*
* If osc_inc_unstable_pages (via osc_extent_finish) races with
* this called via the rq_commit_cb, I need to ensure
* osc_dec_unstable_pages is still called. Otherwise unstable
* pages may be leaked.
*/
- if (req->rq_unstable) {
+ spin_lock(&req->rq_lock);
+ if (unlikely(req->rq_unstable)) {
+ req->rq_unstable = 0;
spin_unlock(&req->rq_lock);
osc_dec_unstable_pages(req);
- spin_lock(&req->rq_lock);
} else {
req->rq_committed = 1;
+ spin_unlock(&req->rq_lock);
}
- spin_unlock(&req->rq_lock);
}
/**
@@ -1881,13 +1821,13 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
struct osc_async_page *tmp;
struct cl_req *clerq = NULL;
enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
- struct ldlm_lock *lock = NULL;
struct cl_req_attr *crattr = NULL;
u64 starting_offset = OBD_OBJECT_EOF;
u64 ending_offset = 0;
int mpflag = 0;
int mem_tight = 0;
int page_count = 0;
+ bool soft_sync = false;
int i;
int rc;
struct ost_body *body;
@@ -1915,6 +1855,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
}
}
+ soft_sync = osc_over_unstable_soft_limit(cli);
if (mem_tight)
mpflag = cfs_memory_pressure_get_and_set();
@@ -1947,10 +1888,11 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
rc = PTR_ERR(clerq);
goto out;
}
- lock = oap->oap_ldlm_lock;
}
if (mem_tight)
oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+ if (soft_sync)
+ oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
pga[i] = &oap->oap_brw_page;
pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
@@ -1964,10 +1906,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
LASSERT(clerq);
crattr->cra_oa = oa;
cl_req_attr_set(env, clerq, crattr, ~0ULL);
- if (lock) {
- oa->o_handle = lock->l_remote_handle;
- oa->o_valid |= OBD_MD_FLHANDLE;
- }
rc = cl_req_prep(env, clerq);
if (rc != 0) {
@@ -1998,7 +1936,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
crattr->cra_oa = &body->oa;
cl_req_attr_set(env, clerq, crattr,
- OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
+ OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
@@ -2044,7 +1982,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
}
spin_unlock(&cli->cl_loi_list_lock);
- DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
+ DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%dw in flight",
page_count, aa, cli->cl_r_in_flight,
cli->cl_w_in_flight);
@@ -2116,27 +2054,6 @@ static int osc_set_data_with_check(struct lustre_handle *lockh,
return set;
}
-/* find any ldlm lock of the inode in osc
- * return 0 not find
- * 1 find one
- * < 0 error
- */
-static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
- ldlm_iterator_t replace, void *data)
-{
- struct ldlm_res_id res_id;
- struct obd_device *obd = class_exp2obd(exp);
- int rc = 0;
-
- ostid_build_res_name(&lsm->lsm_oi, &res_id);
- rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
- if (rc == LDLM_ITER_STOP)
- return 1;
- if (rc == LDLM_ITER_CONTINUE)
- return 0;
- return rc;
-}
-
static int osc_enqueue_fini(struct ptlrpc_request *req,
osc_enqueue_upcall_f upcall, void *cookie,
struct lustre_handle *lockh, enum ldlm_mode mode,
@@ -2586,71 +2503,6 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
return rc;
}
-/* Retrieve object striping information.
- *
- * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
- * the maximum number of OST indices which will fit in the user buffer.
- * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
- */
-static int osc_getstripe(struct lov_stripe_md *lsm,
- struct lov_user_md __user *lump)
-{
- /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
- struct lov_user_md_v3 lum, *lumk;
- struct lov_user_ost_data_v1 *lmm_objects;
- int rc = 0, lum_size;
-
- if (!lsm)
- return -ENODATA;
-
- /* we only need the header part from user space to get lmm_magic and
- * lmm_stripe_count, (the header part is common to v1 and v3)
- */
- lum_size = sizeof(struct lov_user_md_v1);
- if (copy_from_user(&lum, lump, lum_size))
- return -EFAULT;
-
- if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
- (lum.lmm_magic != LOV_USER_MAGIC_V3))
- return -EINVAL;
-
- /* lov_user_md_vX and lov_mds_md_vX must have the same size */
- LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
- LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
- LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
-
- /* we can use lov_mds_md_size() to compute lum_size
- * because lov_user_md_vX and lov_mds_md_vX have the same size
- */
- if (lum.lmm_stripe_count > 0) {
- lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
- lumk = kzalloc(lum_size, GFP_NOFS);
- if (!lumk)
- return -ENOMEM;
-
- if (lum.lmm_magic == LOV_USER_MAGIC_V1)
- lmm_objects =
- &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
- else
- lmm_objects = &(lumk->lmm_objects[0]);
- lmm_objects->l_ost_oi = lsm->lsm_oi;
- } else {
- lum_size = lov_mds_md_size(0, lum.lmm_magic);
- lumk = &lum;
- }
-
- lumk->lmm_oi = lsm->lsm_oi;
- lumk->lmm_stripe_count = 1;
-
- if (copy_to_user(lump, lumk, lum_size))
- rc = -EFAULT;
-
- if (lumk != &lum)
- kfree(lumk);
-
- return rc;
-}
-
static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
void *karg, void __user *uarg)
{
@@ -2664,57 +2516,6 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
return -EINVAL;
}
switch (cmd) {
- case OBD_IOC_LOV_GET_CONFIG: {
- char *buf;
- struct lov_desc *desc;
- struct obd_uuid uuid;
-
- buf = NULL;
- len = 0;
- if (obd_ioctl_getdata(&buf, &len, uarg)) {
- err = -EINVAL;
- goto out;
- }
-
- data = (struct obd_ioctl_data *)buf;
-
- if (sizeof(*desc) > data->ioc_inllen1) {
- obd_ioctl_freedata(buf, len);
- err = -EINVAL;
- goto out;
- }
-
- if (data->ioc_inllen2 < sizeof(uuid)) {
- obd_ioctl_freedata(buf, len);
- err = -EINVAL;
- goto out;
- }
-
- desc = (struct lov_desc *)data->ioc_inlbuf1;
- desc->ld_tgt_count = 1;
- desc->ld_active_tgt_count = 1;
- desc->ld_default_stripe_count = 1;
- desc->ld_default_stripe_size = 0;
- desc->ld_default_stripe_offset = 0;
- desc->ld_pattern = 0;
- memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
-
- memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
-
- err = copy_to_user(uarg, buf, len);
- if (err)
- err = -EFAULT;
- obd_ioctl_freedata(buf, len);
- goto out;
- }
- case LL_IOC_LOV_SETSTRIPE:
- err = obd_alloc_memmd(exp, karg);
- if (err > 0)
- err = 0;
- goto out;
- case LL_IOC_LOV_GETSTRIPE:
- err = osc_getstripe(karg, uarg);
- goto out;
case OBD_IOC_CLIENT_RECOVER:
err = ptlrpc_recover_import(obd->u.cli.cl_import,
data->ioc_inlbuf1, 0);
@@ -2749,51 +2550,7 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
if (!vallen || !val)
return -EFAULT;
- if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
- __u32 *stripe = val;
- *vallen = sizeof(*stripe);
- *stripe = 0;
- return 0;
- } else if (KEY_IS(KEY_LAST_ID)) {
- struct ptlrpc_request *req;
- u64 *reply;
- char *tmp;
- int rc;
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_OST_GET_INFO_LAST_ID);
- if (!req)
- return -ENOMEM;
-
- req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
- RCL_CLIENT, keylen);
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
- if (rc) {
- ptlrpc_request_free(req);
- return rc;
- }
-
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
- memcpy(tmp, key, keylen);
-
- req->rq_no_delay = 1;
- req->rq_no_resend = 1;
- ptlrpc_request_set_replen(req);
- rc = ptlrpc_queue_wait(req);
- if (rc)
- goto out;
-
- reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
- if (!reply) {
- rc = -EPROTO;
- goto out;
- }
-
- *((u64 *)val) = *reply;
-out:
- ptlrpc_req_finished(req);
- return rc;
- } else if (KEY_IS(KEY_FIEMAP)) {
+ if (KEY_IS(KEY_FIEMAP)) {
struct ll_fiemap_info_key *fm_key = key;
struct ldlm_res_id res_id;
ldlm_policy_data_t policy;
@@ -2931,11 +2688,11 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
struct client_obd *cli = &obd->u.cli;
- int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
- int target = *(int *)val;
+ long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
+ long target = *(long *)val;
nr = osc_lru_shrink(env, cli, min(nr, target), true);
- *(int *)val -= nr;
+ *(long *)val -= nr;
return 0;
}
@@ -3014,8 +2771,9 @@ static int osc_reconnect(const struct lu_env *env,
long lost_grant;
spin_lock(&cli->cl_loi_list_lock);
- data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
- 2 * cli_brw_size(obd);
+ data->ocd_grant = (cli->cl_avail_grant +
+ (cli->cl_dirty_pages << PAGE_SHIFT)) ?:
+ 2 * cli_brw_size(obd);
lost_grant = cli->cl_lost_grant;
cli->cl_lost_grant = 0;
spin_unlock(&cli->cl_loi_list_lock);
@@ -3346,7 +3104,6 @@ static struct obd_ops osc_obd_ops = {
.disconnect = osc_disconnect,
.statfs = osc_statfs,
.statfs_async = osc_statfs_async,
- .packmd = osc_packmd,
.unpackmd = osc_unpackmd,
.create = osc_create,
.destroy = osc_destroy,
@@ -3354,7 +3111,6 @@ static struct obd_ops osc_obd_ops = {
.getattr_async = osc_getattr_async,
.setattr = osc_setattr,
.setattr_async = osc_setattr_async,
- .find_cbdata = osc_find_cbdata,
.iocontrol = osc_iocontrol,
.get_info = osc_get_info,
.set_info_async = osc_set_info_async,