diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/osc')
-rw-r--r-- | drivers/staging/lustre/lustre/osc/lproc_osc.c | 41 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_cache.c | 279 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_cl_internal.h | 6 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_internal.h | 9 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_io.c | 46 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_lock.c | 4 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_object.c | 7 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_page.c | 278 | ||||
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_request.c | 400 |
9 files changed, 453 insertions, 617 deletions
diff --git a/drivers/staging/lustre/lustre/osc/lproc_osc.c b/drivers/staging/lustre/lustre/osc/lproc_osc.c index 7e83d395b998..f0062d44ee03 100644 --- a/drivers/staging/lustre/lustre/osc/lproc_osc.c +++ b/drivers/staging/lustre/lustre/osc/lproc_osc.c @@ -119,6 +119,7 @@ static ssize_t max_rpcs_in_flight_store(struct kobject *kobj, spin_lock(&cli->cl_loi_list_lock); cli->cl_max_rpcs_in_flight = val; + client_adjust_max_dirty(cli); spin_unlock(&cli->cl_loi_list_lock); return count; @@ -136,10 +137,10 @@ static ssize_t max_dirty_mb_show(struct kobject *kobj, int mult; spin_lock(&cli->cl_loi_list_lock); - val = cli->cl_dirty_max; + val = cli->cl_dirty_max_pages; spin_unlock(&cli->cl_loi_list_lock); - mult = 1 << 20; + mult = 1 << (20 - PAGE_SHIFT); return lprocfs_read_frac_helper(buf, PAGE_SIZE, val, mult); } @@ -166,7 +167,7 @@ static ssize_t max_dirty_mb_store(struct kobject *kobj, return -ERANGE; spin_lock(&cli->cl_loi_list_lock); - cli->cl_dirty_max = (u32)(pages_number << PAGE_SHIFT); + cli->cl_dirty_max_pages = pages_number; osc_wake_cache_waiters(cli); spin_unlock(&cli->cl_loi_list_lock); @@ -181,11 +182,11 @@ static int osc_cached_mb_seq_show(struct seq_file *m, void *v) int shift = 20 - PAGE_SHIFT; seq_printf(m, - "used_mb: %d\n" - "busy_cnt: %d\n", - (atomic_read(&cli->cl_lru_in_list) + - atomic_read(&cli->cl_lru_busy)) >> shift, - atomic_read(&cli->cl_lru_busy)); + "used_mb: %ld\n" + "busy_cnt: %ld\n", + (atomic_long_read(&cli->cl_lru_in_list) + + atomic_long_read(&cli->cl_lru_busy)) >> shift, + atomic_long_read(&cli->cl_lru_busy)); return 0; } @@ -197,8 +198,10 @@ static ssize_t osc_cached_mb_seq_write(struct file *file, { struct obd_device *dev = ((struct seq_file *)file->private_data)->private; struct client_obd *cli = &dev->u.cli; - int pages_number, mult, rc; + long pages_number, rc; char kernbuf[128]; + int mult; + u64 val; if (count >= sizeof(kernbuf)) return -EINVAL; @@ -210,14 +213,18 @@ static ssize_t osc_cached_mb_seq_write(struct file *file, mult = 1 << (20 - PAGE_SHIFT); buffer += lprocfs_find_named_value(kernbuf, "used_mb:", &count) - kernbuf; - rc = lprocfs_write_frac_helper(buffer, count, &pages_number, mult); + rc = lprocfs_write_frac_u64_helper(buffer, count, &val, mult); if (rc) return rc; + if (val > LONG_MAX) + return -ERANGE; + pages_number = (long)val; + if (pages_number < 0) return -ERANGE; - rc = atomic_read(&cli->cl_lru_in_list) - pages_number; + rc = atomic_long_read(&cli->cl_lru_in_list) - pages_number; if (rc > 0) { struct lu_env *env; int refcheck; @@ -244,7 +251,7 @@ static ssize_t cur_dirty_bytes_show(struct kobject *kobj, int len; spin_lock(&cli->cl_loi_list_lock); - len = sprintf(buf, "%lu\n", cli->cl_dirty); + len = sprintf(buf, "%lu\n", cli->cl_dirty_pages << PAGE_SHIFT); spin_unlock(&cli->cl_loi_list_lock); return len; @@ -583,6 +590,7 @@ static ssize_t max_pages_per_rpc_store(struct kobject *kobj, } spin_lock(&cli->cl_loi_list_lock); cli->cl_max_pages_per_rpc = val; + client_adjust_max_dirty(cli); spin_unlock(&cli->cl_loi_list_lock); return count; @@ -596,13 +604,14 @@ static ssize_t unstable_stats_show(struct kobject *kobj, struct obd_device *dev = container_of(kobj, struct obd_device, obd_kobj); struct client_obd *cli = &dev->u.cli; - int pages, mb; + long pages; + int mb; - pages = atomic_read(&cli->cl_unstable_count); + pages = atomic_long_read(&cli->cl_unstable_count); mb = (pages * PAGE_SIZE) >> 20; - return sprintf(buf, "unstable_pages: %8d\n" - "unstable_mb: %8d\n", pages, mb); + return sprintf(buf, "unstable_pages: %20ld\n" + "unstable_mb: %10d\n", pages, mb); } LUSTRE_RO_ATTR(unstable_stats); diff --git a/drivers/staging/lustre/lustre/osc/osc_cache.c b/drivers/staging/lustre/lustre/osc/osc_cache.c index d011135802d5..4bbe219add98 100644 --- a/drivers/staging/lustre/lustre/osc/osc_cache.c +++ b/drivers/staging/lustre/lustre/osc/osc_cache.c @@ -44,7 +44,7 @@ static int extent_debug; /* set it to be true for more debug */ static void osc_update_pending(struct osc_object *obj, int cmd, int delta); static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, - int state); + enum osc_extent_state state); static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, struct osc_async_page *oap, int sent, int rc); static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap, @@ -177,7 +177,7 @@ static int osc_extent_sanity_check0(struct osc_extent *ext, { struct osc_object *obj = ext->oe_obj; struct osc_async_page *oap; - int page_count; + size_t page_count; int rc = 0; if (!osc_object_is_locked(obj)) { @@ -632,7 +632,7 @@ static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2) */ static struct osc_extent *osc_extent_find(const struct lu_env *env, struct osc_object *obj, pgoff_t index, - int *grants) + unsigned int *grants) { struct client_obd *cli = osc_cli(obj); struct osc_lock *olck; @@ -643,10 +643,10 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env, struct osc_extent *found = NULL; pgoff_t chunk; pgoff_t max_end; - int max_pages; /* max_pages_per_rpc */ - int chunksize; + unsigned int max_pages; /* max_pages_per_rpc */ + unsigned int chunksize; int ppc_bits; /* pages per chunk bits */ - int chunk_mask; + pgoff_t chunk_mask; int rc; cur = osc_extent_alloc(obj); @@ -700,8 +700,8 @@ restart: if (!ext) ext = first_extent(obj); while (ext) { - loff_t ext_chk_start = ext->oe_start >> ppc_bits; - loff_t ext_chk_end = ext->oe_end >> ppc_bits; + pgoff_t ext_chk_start = ext->oe_start >> ppc_bits; + pgoff_t ext_chk_end = ext->oe_end >> ppc_bits; LASSERT(sanity_check_nolock(ext) == 0); if (chunk > ext_chk_end + 1) @@ -913,7 +913,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, return 0; } -static int extent_wait_cb(struct osc_extent *ext, int state) +static int extent_wait_cb(struct osc_extent *ext, enum osc_extent_state state) { int ret; @@ -928,7 +928,7 @@ static int extent_wait_cb(struct osc_extent *ext, int state) * Wait for the extent's state to become @state. */ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, - int state) + enum osc_extent_state state) { struct osc_object *obj = ext->oe_obj; struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL, @@ -958,8 +958,8 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi); if (rc == -ETIMEDOUT) { OSC_EXTENT_DUMP(D_ERROR, ext, - "%s: wait ext to %d timedout, recovery in progress?\n", - osc_export(obj)->exp_obd->obd_name, state); + "%s: wait ext to %u timedout, recovery in progress?\n", + osc_export(obj)->exp_obd->obd_name, state); lwi = LWI_INTR(NULL, NULL); rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), @@ -1099,7 +1099,7 @@ static int osc_extent_make_ready(const struct lu_env *env, struct osc_async_page *oap; struct osc_async_page *last = NULL; struct osc_object *obj = ext->oe_obj; - int page_count = 0; + unsigned int page_count = 0; int rc; /* we're going to grab page lock, so object lock must not be taken. */ @@ -1140,9 +1140,11 @@ static int osc_extent_make_ready(const struct lu_env *env, * the size of file. */ if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) { - last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE); - LASSERT(last->oap_count > 0); - LASSERT(last->oap_page_off + last->oap_count <= PAGE_SIZE); + int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE); + + LASSERT(last_oap_count > 0); + LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE); + last->oap_count = last_oap_count; spin_lock(&last->oap_lock); last->oap_async_flags |= ASYNC_COUNT_STABLE; spin_unlock(&last->oap_lock); @@ -1174,7 +1176,8 @@ static int osc_extent_make_ready(const struct lu_env *env, * called to expand the extent for the same IO. To expand the extent, the * page index must be in the same or next chunk of ext->oe_end. */ -static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants) +static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, + unsigned int *grants) { struct osc_object *obj = ext->oe_obj; struct client_obd *cli = osc_cli(obj); @@ -1183,7 +1186,7 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants) pgoff_t chunk = index >> ppc_bits; pgoff_t end_chunk; pgoff_t end_index; - int chunksize = 1 << cli->cl_chunkbits; + unsigned int chunksize = 1 << cli->cl_chunkbits; int rc = 0; LASSERT(ext->oe_max_end >= index && ext->oe_start <= index); @@ -1361,7 +1364,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, if (rc == 0 && srvlock) { struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev; struct osc_stats *stats = &lu2osc_dev(ld)->od_stats; - int bytes = oap->oap_count; + size_t bytes = oap->oap_count; if (crt == CRT_READ) stats->os_lockless_reads += bytes; @@ -1383,18 +1386,16 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \ struct client_obd *__tmp = (cli); \ - CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d " \ - "unstable_pages: %d/%d dropped: %ld avail: %ld, " \ - "reserved: %ld, flight: %d } lru {in list: %d, " \ - "left: %d, waiters: %d }" fmt, \ + CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu " \ + "dropped: %ld avail: %ld, reserved: %ld, flight: %d }" \ + "lru {in list: %ld, left: %ld, waiters: %d }" fmt "\n", \ __tmp->cl_import->imp_obd->obd_name, \ - __tmp->cl_dirty, __tmp->cl_dirty_max, \ - atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \ - atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \ + __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages, \ + atomic_long_read(&obd_dirty_pages), obd_max_dirty_pages, \ __tmp->cl_lost_grant, __tmp->cl_avail_grant, \ __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \ - atomic_read(&__tmp->cl_lru_in_list), \ - atomic_read(&__tmp->cl_lru_busy), \ + atomic_long_read(&__tmp->cl_lru_in_list), \ + atomic_long_read(&__tmp->cl_lru_busy), \ atomic_read(&__tmp->cl_lru_shrinkers), ##args); \ } while (0) @@ -1404,8 +1405,8 @@ static void osc_consume_write_grant(struct client_obd *cli, { assert_spin_locked(&cli->cl_loi_list_lock); LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT)); - atomic_inc(&obd_dirty_pages); - cli->cl_dirty += PAGE_SIZE; + atomic_long_inc(&obd_dirty_pages); + cli->cl_dirty_pages++; pga->flag |= OBD_BRW_FROM_GRANT; CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", PAGE_SIZE, pga, pga->pg); @@ -1424,12 +1425,12 @@ static void osc_release_write_grant(struct client_obd *cli, } pga->flag &= ~OBD_BRW_FROM_GRANT; - atomic_dec(&obd_dirty_pages); - cli->cl_dirty -= PAGE_SIZE; + atomic_long_dec(&obd_dirty_pages); + cli->cl_dirty_pages--; if (pga->flag & OBD_BRW_NOCACHE) { pga->flag &= ~OBD_BRW_NOCACHE; - atomic_dec(&obd_dirty_transit_pages); - cli->cl_dirty_transit -= PAGE_SIZE; + atomic_long_dec(&obd_dirty_transit_pages); + cli->cl_dirty_transit--; } } @@ -1494,11 +1495,11 @@ static void osc_unreserve_grant(struct client_obd *cli, static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, unsigned int lost_grant) { - int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + unsigned long grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; spin_lock(&cli->cl_loi_list_lock); - atomic_sub(nr_pages, &obd_dirty_pages); - cli->cl_dirty -= nr_pages << PAGE_SHIFT; + atomic_long_sub(nr_pages, &obd_dirty_pages); + cli->cl_dirty_pages -= nr_pages; cli->cl_lost_grant += lost_grant; if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) { /* borrow some grant from truncate to avoid the case that @@ -1511,7 +1512,7 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n", lost_grant, cli->cl_lost_grant, - cli->cl_avail_grant, cli->cl_dirty); + cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_SHIFT); } /** @@ -1535,19 +1536,18 @@ static int osc_enter_cache_try(struct client_obd *cli, { int rc; - OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes); + OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes); rc = osc_reserve_grant(cli, bytes); if (rc < 0) return 0; - if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max && - atomic_read(&obd_unstable_pages) + 1 + - atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) { + if (cli->cl_dirty_pages <= cli->cl_dirty_max_pages && + atomic_long_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) { osc_consume_write_grant(cli, &oap->oap_brw_page); if (transient) { - cli->cl_dirty_transit += PAGE_SIZE; - atomic_inc(&obd_dirty_transit_pages); + cli->cl_dirty_transit++; + atomic_long_inc(&obd_dirty_transit_pages); oap->oap_brw_flags |= OBD_BRW_NOCACHE; } rc = 1; @@ -1581,11 +1581,13 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc = oap->oap_obj; struct lov_oinfo *loi = osc->oo_oinfo; struct osc_cache_waiter ocw; - struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL, - LWI_ON_SIGNAL_NOOP, NULL); + struct l_wait_info lwi; int rc = -EDQUOT; - OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes); + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(AT_OFF ? obd_timeout : at_max), + NULL, LWI_ON_SIGNAL_NOOP, NULL); + + OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes); spin_lock(&cli->cl_loi_list_lock); @@ -1593,14 +1595,16 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, * of queued writes and create a discontiguous rpc stream */ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || - cli->cl_dirty_max < PAGE_SIZE || - cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) { + !cli->cl_dirty_max_pages || cli->cl_ar.ar_force_sync || + loi->loi_ar.ar_force_sync) { + OSC_DUMP_GRANT(D_CACHE, cli, "forced sync i/o\n"); rc = -EDQUOT; goto out; } /* Hopefully normal case - cache space and write credits available */ if (osc_enter_cache_try(cli, oap, bytes, 0)) { + OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n"); rc = 0; goto out; } @@ -1615,7 +1619,7 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, init_waitqueue_head(&ocw.ocw_waitq); ocw.ocw_oap = oap; ocw.ocw_grant = bytes; - while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) { + while (cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0) { list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); ocw.ocw_rc = 0; spin_unlock(&cli->cl_loi_list_lock); @@ -1629,32 +1633,49 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, spin_lock(&cli->cl_loi_list_lock); - /* l_wait_event is interrupted by signal, or timed out */ if (rc < 0) { - if (rc == -ETIMEDOUT) { - OSC_DUMP_GRANT(D_ERROR, cli, - "try to reserve %d.\n", bytes); - osc_extent_tree_dump(D_ERROR, osc); - rc = -EDQUOT; - } - + /* l_wait_event is interrupted by signal, or timed out */ list_del_init(&ocw.ocw_entry); - goto out; + break; } - LASSERT(list_empty(&ocw.ocw_entry)); rc = ocw.ocw_rc; if (rc != -EDQUOT) - goto out; + break; if (osc_enter_cache_try(cli, oap, bytes, 0)) { rc = 0; - goto out; + break; } } + + switch (rc) { + case 0: + OSC_DUMP_GRANT(D_CACHE, cli, "finally got grant space\n"); + break; + case -ETIMEDOUT: + OSC_DUMP_GRANT(D_CACHE, cli, + "timeout, fall back to sync i/o\n"); + osc_extent_tree_dump(D_CACHE, osc); + /* fall back to synchronous I/O */ + rc = -EDQUOT; + break; + case -EINTR: + /* Ensures restartability - LU-3581 */ + OSC_DUMP_GRANT(D_CACHE, cli, "interrupted\n"); + rc = -ERESTARTSYS; + break; + case -EDQUOT: + OSC_DUMP_GRANT(D_CACHE, cli, + "no grant space, fall back to sync i/o\n"); + break; + default: + CDEBUG(D_CACHE, "%s: event for cache space @ %p never arrived due to %d, fall back to sync i/o\n", + cli->cl_import->imp_obd->obd_name, &ocw, rc); + break; + } out: spin_unlock(&cli->cl_loi_list_lock); - OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc); return rc; } @@ -1670,19 +1691,17 @@ void osc_wake_cache_waiters(struct client_obd *cli) ocw->ocw_rc = -EDQUOT; /* we can't dirty more */ - if ((cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) || - (atomic_read(&obd_unstable_pages) + 1 + - atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) { - CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %d\n", - cli->cl_dirty, - cli->cl_dirty_max, obd_max_dirty_pages); + if ((cli->cl_dirty_pages > cli->cl_dirty_max_pages) || + (atomic_long_read(&obd_dirty_pages) + 1 > + obd_max_dirty_pages)) { + CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %ld\n", + cli->cl_dirty_pages, cli->cl_dirty_max_pages, + obd_max_dirty_pages); goto wakeup; } - ocw->ocw_rc = 0; - if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) - ocw->ocw_rc = -EDQUOT; - + if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) + ocw->ocw_rc = 0; wakeup: CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n", ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc); @@ -1843,97 +1862,6 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, ar->ar_force_sync = 0; } -/** - * Performs "unstable" page accounting. This function balances the - * increment operations performed in osc_inc_unstable_pages. It is - * registered as the RPC request callback, and is executed when the - * bulk RPC is committed on the server. Thus at this point, the pages - * involved in the bulk transfer are no longer considered unstable. - */ -void osc_dec_unstable_pages(struct ptlrpc_request *req) -{ - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - struct ptlrpc_bulk_desc *desc = req->rq_bulk; - int page_count = desc->bd_iov_count; - int i; - - /* No unstable page tracking */ - if (!cli->cl_cache) - return; - - LASSERT(page_count >= 0); - - for (i = 0; i < page_count; i++) - dec_node_page_state(desc->bd_iov[i].kiov_page, - NR_UNSTABLE_NFS); - - atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr); - LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0); - - atomic_sub(page_count, &cli->cl_unstable_count); - LASSERT(atomic_read(&cli->cl_unstable_count) >= 0); - - atomic_sub(page_count, &obd_unstable_pages); - LASSERT(atomic_read(&obd_unstable_pages) >= 0); - - spin_lock(&req->rq_lock); - req->rq_committed = 1; - req->rq_unstable = 0; - spin_unlock(&req->rq_lock); - - wake_up_all(&cli->cl_cache->ccc_unstable_waitq); -} - -/* "unstable" page accounting. See: osc_dec_unstable_pages. */ -void osc_inc_unstable_pages(struct ptlrpc_request *req) -{ - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - struct ptlrpc_bulk_desc *desc = req->rq_bulk; - long page_count = desc->bd_iov_count; - int i; - - /* No unstable page tracking */ - if (!cli->cl_cache) - return; - - LASSERT(page_count >= 0); - - for (i = 0; i < page_count; i++) - inc_node_page_state(desc->bd_iov[i].kiov_page, - NR_UNSTABLE_NFS); - - LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0); - atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr); - - LASSERT(atomic_read(&cli->cl_unstable_count) >= 0); - atomic_add(page_count, &cli->cl_unstable_count); - - LASSERT(atomic_read(&obd_unstable_pages) >= 0); - atomic_add(page_count, &obd_unstable_pages); - - spin_lock(&req->rq_lock); - - /* - * If the request has already been committed (i.e. brw_commit - * called via rq_commit_cb), we need to undo the unstable page - * increments we just performed because rq_commit_cb wont be - * called again. Otherwise, just set the commit callback so the - * unstable page accounting is properly updated when the request - * is committed - */ - if (req->rq_committed) { - /* Drop lock before calling osc_dec_unstable_pages */ - spin_unlock(&req->rq_lock); - osc_dec_unstable_pages(req); - spin_lock(&req->rq_lock); - } else { - req->rq_unstable = 1; - req->rq_commit_cb = osc_dec_unstable_pages; - } - - spin_unlock(&req->rq_lock); -} - /* this must be called holding the loi list lock to give coverage to exit_cache, * async_flag maintenance, and oap_request */ @@ -1945,9 +1873,6 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, __u64 xid = 0; if (oap->oap_request) { - if (!rc) - osc_inc_unstable_pages(oap->oap_request); - xid = ptlrpc_req_xid(oap->oap_request); ptlrpc_req_finished(oap->oap_request); oap->oap_request = NULL; @@ -1979,7 +1904,7 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, */ static int try_to_add_extent_for_io(struct client_obd *cli, struct osc_extent *ext, struct list_head *rpclist, - int *pc, unsigned int *max_pages) + unsigned int *pc, unsigned int *max_pages) { struct osc_extent *tmp; struct osc_async_page *oap = list_first_entry(&ext->oe_pages, @@ -2032,12 +1957,13 @@ static int try_to_add_extent_for_io(struct client_obd *cli, * 5. Traverse the extent tree from the 1st extent; * 6. Above steps exit if there is no space in this RPC. */ -static int get_write_extents(struct osc_object *obj, struct list_head *rpclist) +static unsigned int get_write_extents(struct osc_object *obj, + struct list_head *rpclist) { struct client_obd *cli = osc_cli(obj); struct osc_extent *ext; struct osc_extent *temp; - int page_count = 0; + unsigned int page_count = 0; unsigned int max_pages = cli->cl_max_pages_per_rpc; LASSERT(osc_object_is_locked(obj)); @@ -2175,7 +2101,7 @@ osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_extent *ext; struct osc_extent *next; LIST_HEAD(rpclist); - int page_count = 0; + unsigned int page_count = 0; unsigned int max_pages = cli->cl_max_pages_per_rpc; int rc = 0; @@ -2390,7 +2316,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, struct client_obd *cli = oap->oap_cli; struct osc_object *osc = oap->oap_obj; pgoff_t index; - int grants = 0; + unsigned int grants = 0, tmp; int brw_flags = OBD_BRW_ASYNC; int cmd = OBD_BRW_WRITE; int need_release = 0; @@ -2434,9 +2360,6 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, return rc; } - if (osc_over_unstable_soft_limit(cli)) - brw_flags |= OBD_BRW_SOFT_SYNC; - oap->oap_cmd = cmd; oap->oap_page_off = ops->ops_from; oap->oap_count = ops->ops_to - ops->ops_from; @@ -2476,7 +2399,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, grants = 0; need_release = 1; } else if (ext->oe_end < index) { - int tmp = grants; + tmp = grants; /* try to expand this extent */ rc = osc_extent_expand(ext, index, &tmp); if (rc < 0) { @@ -2501,7 +2424,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, } if (!ext) { - int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; /* try to find new extent to cover this page */ LASSERT(!oio->oi_active); @@ -2645,7 +2568,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, goto out; spin_lock(&oap->oap_lock); - oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT; + oap->oap_async_flags |= ASYNC_READY | ASYNC_URGENT; spin_unlock(&oap->oap_lock); if (memory_pressure_get()) diff --git a/drivers/staging/lustre/lustre/osc/osc_cl_internal.h b/drivers/staging/lustre/lustre/osc/osc_cl_internal.h index c8c3f1ca77be..9c8de15c309c 100644 --- a/drivers/staging/lustre/lustre/osc/osc_cl_internal.h +++ b/drivers/staging/lustre/lustre/osc/osc_cl_internal.h @@ -64,7 +64,7 @@ struct osc_io { /** true if this io is lockless. */ unsigned int oi_lockless; /** how many LRU pages are reserved for this IO */ - int oi_lru_reserved; + unsigned long oi_lru_reserved; /** active extents, we know how many bytes is going to be written, * so having an active extent will prevent it from being fragmented @@ -389,7 +389,7 @@ extern struct lu_device_type osc_device_type; extern struct lu_context_key osc_key; extern struct lu_context_key osc_session_key; -#define OSC_FLAGS (ASYNC_URGENT|ASYNC_READY) +#define OSC_FLAGS (ASYNC_URGENT | ASYNC_READY) int osc_lock_init(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, @@ -608,7 +608,7 @@ struct osc_extent { /** link list of osc_object's oo_{hp|urgent|locking}_exts. */ struct list_head oe_link; /** state of this extent */ - unsigned int oe_state; + enum osc_extent_state oe_state; /** flags for this extent. */ unsigned int oe_intree:1, /** 0 is write, 1 is read */ diff --git a/drivers/staging/lustre/lustre/osc/osc_internal.h b/drivers/staging/lustre/lustre/osc/osc_internal.h index 7a27f0961955..67fe0a254991 100644 --- a/drivers/staging/lustre/lustre/osc/osc_internal.h +++ b/drivers/staging/lustre/lustre/osc/osc_internal.h @@ -71,7 +71,6 @@ struct osc_async_page { struct client_obd *oap_cli; struct osc_object *oap_obj; - struct ldlm_lock *oap_ldlm_lock; spinlock_t oap_lock; }; @@ -134,9 +133,9 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg); int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct list_head *ext_list, int cmd); -int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, - int target, bool force); -int osc_lru_reclaim(struct client_obd *cli); +long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, + long target, bool force); +long osc_lru_reclaim(struct client_obd *cli); unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock); @@ -198,7 +197,7 @@ int osc_quotacheck(struct obd_device *unused, struct obd_export *exp, int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk); void osc_inc_unstable_pages(struct ptlrpc_request *req); void osc_dec_unstable_pages(struct ptlrpc_request *req); -int osc_over_unstable_soft_limit(struct client_obd *cli); +bool osc_over_unstable_soft_limit(struct client_obd *cli); struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, struct osc_object *obj, pgoff_t index, diff --git a/drivers/staging/lustre/lustre/osc/osc_io.c b/drivers/staging/lustre/lustre/osc/osc_io.c index 6e3dcd38913f..8a559cbcdd0c 100644 --- a/drivers/staging/lustre/lustre/osc/osc_io.c +++ b/drivers/staging/lustre/lustre/osc/osc_io.c @@ -109,11 +109,11 @@ static int osc_io_submit(const struct lu_env *env, struct cl_page_list *qin = &queue->c2_qin; struct cl_page_list *qout = &queue->c2_qout; - int queued = 0; + unsigned int queued = 0; int result = 0; int cmd; int brw_flags; - int max_pages; + unsigned int max_pages; LASSERT(qin->pl_nr > 0); @@ -163,14 +163,19 @@ static int osc_io_submit(const struct lu_env *env, continue; } - cl_page_list_move(qout, qin, page); spin_lock(&oap->oap_lock); - oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY; + oap->oap_async_flags = ASYNC_URGENT | ASYNC_READY; oap->oap_async_flags |= ASYNC_COUNT_STABLE; spin_unlock(&oap->oap_lock); osc_page_submit(env, opg, crt, brw_flags); list_add_tail(&oap->oap_pending_item, &list); + + if (page->cp_sync_io) + cl_page_list_move(qout, qin, page); + else /* async IO */ + cl_page_list_del(env, qin, page); + if (++queued == max_pages) { queued = 0; result = osc_queue_sync_pages(env, osc, &list, cmd, @@ -195,7 +200,7 @@ static int osc_io_submit(const struct lu_env *env, * Expand stripe KMS if necessary. */ static void osc_page_touch_at(const struct lu_env *env, - struct cl_object *obj, pgoff_t idx, unsigned to) + struct cl_object *obj, pgoff_t idx, size_t to) { struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; struct cl_attr *attr = &osc_env_info(env)->oti_attr; @@ -228,7 +233,7 @@ static void osc_page_touch_at(const struct lu_env *env, attr->cat_size = kms; valid |= CAT_SIZE; } - cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } @@ -314,8 +319,8 @@ static int osc_io_rw_iter_init(const struct lu_env *env, struct osc_object *osc = cl2osc(ios->cis_obj); struct client_obd *cli = osc_cli(osc); unsigned long c; - unsigned int npages; - unsigned int max_pages; + unsigned long npages; + unsigned long max_pages; if (cl_io_is_append(io)) return 0; @@ -328,15 +333,15 @@ static int osc_io_rw_iter_init(const struct lu_env *env, if (npages > max_pages) npages = max_pages; - c = atomic_read(cli->cl_lru_left); + c = atomic_long_read(cli->cl_lru_left); if (c < npages && osc_lru_reclaim(cli) > 0) - c = atomic_read(cli->cl_lru_left); + c = atomic_long_read(cli->cl_lru_left); while (c >= npages) { - if (c == atomic_cmpxchg(cli->cl_lru_left, c, c - npages)) { + if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) { oio->oi_lru_reserved = npages; break; } - c = atomic_read(cli->cl_lru_left); + c = atomic_long_read(cli->cl_lru_left); } return 0; @@ -350,7 +355,7 @@ static void osc_io_rw_iter_fini(const struct lu_env *env, struct client_obd *cli = osc_cli(osc); if (oio->oi_lru_reserved > 0) { - atomic_add(oio->oi_lru_reserved, cli->cl_lru_left); + atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left); oio->oi_lru_reserved = 0; } oio->oi_write_osclock = NULL; @@ -364,7 +369,7 @@ static int osc_io_fault_start(const struct lu_env *env, io = ios->cis_io; fio = &io->u.ci_fault; - CDEBUG(D_INFO, "%lu %d %d\n", + CDEBUG(D_INFO, "%lu %d %zu\n", fio->ft_index, fio->ft_writable, fio->ft_nob); /* * If mapping is writeable, adjust kms to cover this page, @@ -471,18 +476,21 @@ static int osc_io_setattr_start(const struct lu_env *env, attr->cat_ctime = lvb->lvb_ctime; cl_valid |= CAT_CTIME; } - result = cl_object_attr_set(env, obj, attr, cl_valid); + result = cl_object_attr_update(env, obj, attr, + cl_valid); } cl_object_attr_unlock(obj); } memset(oa, 0, sizeof(*oa)); if (result == 0) { oa->o_oi = loi->loi_oi; + obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid); + oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index; oa->o_mtime = attr->cat_mtime; oa->o_atime = attr->cat_atime; oa->o_ctime = attr->cat_ctime; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | - OBD_MD_FLCTIME | OBD_MD_FLMTIME; + oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | + OBD_MD_FLCTIME | OBD_MD_FLMTIME; if (ia_valid & ATTR_SIZE) { oa->o_size = size; oa->o_blocks = OBD_OBJECT_EOF; @@ -559,7 +567,7 @@ static int osc_io_read_start(const struct lu_env *env, if (!slice->cis_io->ci_noatime) { cl_object_attr_lock(obj); attr->cat_atime = ktime_get_real_seconds(); - rc = cl_object_attr_set(env, obj, attr, CAT_ATIME); + rc = cl_object_attr_update(env, obj, attr, CAT_ATIME); cl_object_attr_unlock(obj); } return rc; @@ -576,7 +584,7 @@ static int osc_io_write_start(const struct lu_env *env, cl_object_attr_lock(obj); attr->cat_ctime = ktime_get_real_seconds(); attr->cat_mtime = attr->cat_ctime; - rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME); + rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME); cl_object_attr_unlock(obj); return rc; diff --git a/drivers/staging/lustre/lustre/osc/osc_lock.c b/drivers/staging/lustre/lustre/osc/osc_lock.c index 717d3ffb6789..39a8a5851603 100644 --- a/drivers/staging/lustre/lustre/osc/osc_lock.c +++ b/drivers/staging/lustre/lustre/osc/osc_lock.c @@ -222,7 +222,7 @@ static void osc_lock_lvb_update(const struct lu_env *env, ldlm_lock_allow_match_locked(dlmlock); } - cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } @@ -467,7 +467,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, */ attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms); - cl_object_attr_set(env, obj, attr, CAT_KMS); + cl_object_attr_update(env, obj, attr, CAT_KMS); cl_object_attr_unlock(obj); unlock_res_and_lock(dlmlock); diff --git a/drivers/staging/lustre/lustre/osc/osc_object.c b/drivers/staging/lustre/lustre/osc/osc_object.c index d211d1905e83..aae3a2d4243f 100644 --- a/drivers/staging/lustre/lustre/osc/osc_object.c +++ b/drivers/staging/lustre/lustre/osc/osc_object.c @@ -159,8 +159,8 @@ static int osc_attr_get(const struct lu_env *env, struct cl_object *obj, return 0; } -static int osc_attr_set(const struct lu_env *env, struct cl_object *obj, - const struct cl_attr *attr, unsigned valid) +static int osc_attr_update(const struct lu_env *env, struct cl_object *obj, + const struct cl_attr *attr, unsigned int valid) { struct lov_oinfo *oinfo = cl2osc(obj)->oo_oinfo; struct ost_lvb *lvb = &oinfo->loi_lvb; @@ -195,7 +195,6 @@ static int osc_object_glimpse(const struct lu_env *env, static int osc_object_ast_clear(struct ldlm_lock *lock, void *data) { - LASSERT(lock->l_granted_mode == lock->l_req_mode); if (lock->l_ast_data == data) lock->l_ast_data = NULL; return LDLM_ITER_CONTINUE; @@ -262,7 +261,7 @@ static const struct cl_object_operations osc_ops = { .coo_lock_init = osc_lock_init, .coo_io_init = osc_io_init, .coo_attr_get = osc_attr_get, - .coo_attr_set = osc_attr_set, + .coo_attr_update = osc_attr_update, .coo_glimpse = osc_object_glimpse, .coo_prune = osc_object_prune }; diff --git a/drivers/staging/lustre/lustre/osc/osc_page.c b/drivers/staging/lustre/lustre/osc/osc_page.c index 355f496a2093..2a7a70aa9e80 100644 --- a/drivers/staging/lustre/lustre/osc/osc_page.c +++ b/drivers/staging/lustre/lustre/osc/osc_page.c @@ -323,32 +323,6 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj, return result; } -int osc_over_unstable_soft_limit(struct client_obd *cli) -{ - long obd_upages, obd_dpages, osc_upages; - - /* Can't check cli->cl_unstable_count, therefore, no soft limit */ - if (!cli) - return 0; - - obd_upages = atomic_read(&obd_unstable_pages); - obd_dpages = atomic_read(&obd_dirty_pages); - - osc_upages = atomic_read(&cli->cl_unstable_count); - - /* - * obd_max_dirty_pages is the max number of (dirty + unstable) - * pages allowed at any given time. To simulate an unstable page - * only limit, we subtract the current number of dirty pages - * from this max. This difference is roughly the amount of pages - * currently available for unstable pages. Thus, the soft limit - * is half of that difference. Check osc_upages to ensure we don't - * set SOFT_SYNC for OSCs without any outstanding unstable pages. - */ - return osc_upages && - obd_upages >= (obd_max_dirty_pages - obd_dpages) / 2; -} - /** * Helper function called by osc_io_submit() for every page in an immediate * transfer (i.e., transferred synchronously). @@ -368,9 +342,6 @@ void osc_page_submit(const struct lu_env *env, struct osc_page *opg, oap->oap_count = opg->ops_to - opg->ops_from; oap->oap_brw_flags = brw_flags | OBD_BRW_SYNC; - if (osc_over_unstable_soft_limit(oap->oap_cli)) - oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; - if (capable(CFS_CAP_SYS_RESOURCE)) { oap->oap_brw_flags |= OBD_BRW_NOQUOTA; oap->oap_cmd |= OBD_BRW_NOQUOTA; @@ -409,7 +380,7 @@ static const int lru_shrink_max = 8 << (20 - PAGE_SHIFT); /* 8M */ static int osc_cache_too_much(struct client_obd *cli) { struct cl_client_cache *cache = cli->cl_cache; - int pages = atomic_read(&cli->cl_lru_in_list); + long pages = atomic_long_read(&cli->cl_lru_in_list); unsigned long budget; budget = cache->ccc_lru_max / (atomic_read(&cache->ccc_users) - 2); @@ -417,7 +388,7 @@ static int osc_cache_too_much(struct client_obd *cli) /* if it's going to run out LRU slots, we should free some, but not * too much to maintain fairness among OSCs. */ - if (atomic_read(cli->cl_lru_left) < cache->ccc_lru_max >> 4) { + if (atomic_long_read(cli->cl_lru_left) < cache->ccc_lru_max >> 4) { if (pages >= budget) return lru_shrink_max; else if (pages >= budget / 2) @@ -444,7 +415,7 @@ void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist) { LIST_HEAD(lru); struct osc_async_page *oap; - int npages = 0; + long npages = 0; list_for_each_entry(oap, plist, oap_pending_item) { struct osc_page *opg = oap2osc_page(oap); @@ -460,8 +431,8 @@ void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist) if (npages > 0) { spin_lock(&cli->cl_lru_list_lock); list_splice_tail(&lru, &cli->cl_lru_list); - atomic_sub(npages, &cli->cl_lru_busy); - atomic_add(npages, &cli->cl_lru_in_list); + atomic_long_sub(npages, &cli->cl_lru_busy); + atomic_long_add(npages, &cli->cl_lru_in_list); spin_unlock(&cli->cl_lru_list_lock); /* XXX: May set force to be true for better performance */ @@ -472,9 +443,9 @@ void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist) static void __osc_lru_del(struct client_obd *cli, struct osc_page *opg) { - LASSERT(atomic_read(&cli->cl_lru_in_list) > 0); + LASSERT(atomic_long_read(&cli->cl_lru_in_list) > 0); list_del_init(&opg->ops_lru); - atomic_dec(&cli->cl_lru_in_list); + atomic_long_dec(&cli->cl_lru_in_list); } /** @@ -488,12 +459,12 @@ static void osc_lru_del(struct client_obd *cli, struct osc_page *opg) if (!list_empty(&opg->ops_lru)) { __osc_lru_del(cli, opg); } else { - LASSERT(atomic_read(&cli->cl_lru_busy) > 0); - atomic_dec(&cli->cl_lru_busy); + LASSERT(atomic_long_read(&cli->cl_lru_busy) > 0); + atomic_long_dec(&cli->cl_lru_busy); } spin_unlock(&cli->cl_lru_list_lock); - atomic_inc(cli->cl_lru_left); + atomic_long_inc(cli->cl_lru_left); /* this is a great place to release more LRU pages if * this osc occupies too many LRU pages and kernel is * stealing one of them. @@ -518,7 +489,7 @@ static void osc_lru_use(struct client_obd *cli, struct osc_page *opg) spin_lock(&cli->cl_lru_list_lock); __osc_lru_del(cli, opg); spin_unlock(&cli->cl_lru_list_lock); - atomic_inc(&cli->cl_lru_busy); + atomic_long_inc(&cli->cl_lru_busy); } } @@ -540,10 +511,32 @@ static void discard_pagevec(const struct lu_env *env, struct cl_io *io, } /** + * Check if a cl_page can be released, i.e, it's not being used. + * + * If unstable account is turned on, bulk transfer may hold one refcount + * for recovery so we need to check vmpage refcount as well; otherwise, + * even we can destroy cl_page but the corresponding vmpage can't be reused. + */ +static inline bool lru_page_busy(struct client_obd *cli, struct cl_page *page) +{ + if (cl_page_in_use_noref(page)) + return true; + + if (cli->cl_cache->ccc_unstable_check) { + struct page *vmpage = cl_page_vmpage(page); + + /* vmpage have two known users: cl_page and VM page cache */ + if (page_count(vmpage) - page_mapcount(vmpage) > 2) + return true; + } + return false; +} + +/** * Drop @target of pages from LRU at most. */ -int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, - int target, bool force) +long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, + long target, bool force) { struct cl_io *io; struct cl_object *clobj = NULL; @@ -551,12 +544,12 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, struct osc_page *opg; struct osc_page *temp; int maxscan = 0; - int count = 0; + long count = 0; int index = 0; int rc = 0; - LASSERT(atomic_read(&cli->cl_lru_in_list) >= 0); - if (atomic_read(&cli->cl_lru_in_list) == 0 || target <= 0) + LASSERT(atomic_long_read(&cli->cl_lru_in_list) >= 0); + if (atomic_long_read(&cli->cl_lru_in_list) == 0 || target <= 0) return 0; if (!force) { @@ -575,7 +568,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, io = &osc_env_info(env)->oti_io; spin_lock(&cli->cl_lru_list_lock); - maxscan = min(target << 1, atomic_read(&cli->cl_lru_in_list)); + maxscan = min(target << 1, atomic_long_read(&cli->cl_lru_in_list)); list_for_each_entry_safe(opg, temp, &cli->cl_lru_list, ops_lru) { struct cl_page *page; bool will_free = false; @@ -584,7 +577,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, break; page = opg->ops_cl.cpl_page; - if (cl_page_in_use_noref(page)) { + if (lru_page_busy(cli, page)) { list_move_tail(&opg->ops_lru, &cli->cl_lru_list); continue; } @@ -620,7 +613,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, } if (cl_page_own_try(env, io, page) == 0) { - if (!cl_page_in_use_noref(page)) { + if (!lru_page_busy(cli, page)) { /* remove it from lru list earlier to avoid * lock contention */ @@ -663,24 +656,19 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, atomic_dec(&cli->cl_lru_shrinkers); if (count > 0) { - atomic_add(count, cli->cl_lru_left); + atomic_long_add(count, cli->cl_lru_left); wake_up_all(&osc_lru_waitq); } return count > 0 ? count : rc; } -static inline int max_to_shrink(struct client_obd *cli) -{ - return min(atomic_read(&cli->cl_lru_in_list) >> 1, lru_shrink_max); -} - -int osc_lru_reclaim(struct client_obd *cli) +long osc_lru_reclaim(struct client_obd *cli) { struct cl_env_nest nest; struct lu_env *env; struct cl_client_cache *cache = cli->cl_cache; int max_scans; - int rc = 0; + long rc = 0; LASSERT(cache); @@ -693,15 +681,15 @@ int osc_lru_reclaim(struct client_obd *cli) if (rc == -EBUSY) rc = 0; - CDEBUG(D_CACHE, "%s: Free %d pages from own LRU: %p.\n", + CDEBUG(D_CACHE, "%s: Free %ld pages from own LRU: %p.\n", cli->cl_import->imp_obd->obd_name, rc, cli); goto out; } - CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %d, busy: %d.\n", + CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %ld, busy: %ld.\n", cli->cl_import->imp_obd->obd_name, cli, - atomic_read(&cli->cl_lru_in_list), - atomic_read(&cli->cl_lru_busy)); + atomic_long_read(&cli->cl_lru_in_list), + atomic_long_read(&cli->cl_lru_busy)); /* Reclaim LRU slots from other client_obd as it can't free enough * from its own. This should rarely happen. @@ -717,10 +705,10 @@ int osc_lru_reclaim(struct client_obd *cli) cli = list_entry(cache->ccc_lru.next, struct client_obd, cl_lru_osc); - CDEBUG(D_CACHE, "%s: cli %p LRU pages: %d, busy: %d.\n", + CDEBUG(D_CACHE, "%s: cli %p LRU pages: %ld, busy: %ld.\n", cli->cl_import->imp_obd->obd_name, cli, - atomic_read(&cli->cl_lru_in_list), - atomic_read(&cli->cl_lru_busy)); + atomic_long_read(&cli->cl_lru_in_list), + atomic_long_read(&cli->cl_lru_busy)); list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru); if (osc_cache_too_much(cli) > 0) { @@ -737,11 +725,18 @@ int osc_lru_reclaim(struct client_obd *cli) out: cl_env_nested_put(&nest, env); - CDEBUG(D_CACHE, "%s: cli %p freed %d pages.\n", + CDEBUG(D_CACHE, "%s: cli %p freed %ld pages.\n", cli->cl_import->imp_obd->obd_name, cli, rc); return rc; } +/** + * osc_lru_reserve() is called to reserve an LRU slot for a cl_page. + * + * Usually the LRU slots are reserved in osc_io_iter_rw_init(). + * Only in the case that the LRU slots are in extreme shortage, it should + * have reserved enough slots for an IO. + */ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj, struct osc_page *opg) { @@ -758,8 +753,8 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj, goto out; } - LASSERT(atomic_read(cli->cl_lru_left) >= 0); - while (!atomic_add_unless(cli->cl_lru_left, -1, 0)) { + LASSERT(atomic_long_read(cli->cl_lru_left) >= 0); + while (!atomic_long_add_unless(cli->cl_lru_left, -1, 0)) { /* run out of LRU spaces, try to drop some by itself */ rc = osc_lru_reclaim(cli); if (rc < 0) @@ -770,7 +765,7 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj, cond_resched(); rc = l_wait_event(osc_lru_waitq, - atomic_read(cli->cl_lru_left) > 0, + atomic_long_read(cli->cl_lru_left) > 0, &lwi); if (rc < 0) @@ -779,7 +774,7 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj, out: if (rc >= 0) { - atomic_inc(&cli->cl_lru_busy); + atomic_long_inc(&cli->cl_lru_busy); opg->ops_in_lru = 1; rc = 0; } @@ -787,4 +782,151 @@ out: return rc; } +/** + * Atomic operations are expensive. We accumulate the accounting for the + * same page pgdat to get better performance. + * In practice this can work pretty good because the pages in the same RPC + * are likely from the same page zone. + */ +static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc, + int factor) +{ + int page_count = desc->bd_iov_count; + pg_data_t *last = NULL; + int count = 0; + int i; + + for (i = 0; i < page_count; i++) { + pg_data_t *pgdat = page_pgdat(desc->bd_iov[i].bv_page); + + if (likely(pgdat == last)) { + ++count; + continue; + } + + if (count > 0) { + mod_node_page_state(pgdat, NR_UNSTABLE_NFS, + factor * count); + count = 0; + } + last = pgdat; + ++count; + } + if (count > 0) + mod_node_page_state(last, NR_UNSTABLE_NFS, factor * count); +} + +static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc) +{ + unstable_page_accounting(desc, 1); +} + +static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc) +{ + unstable_page_accounting(desc, -1); +} + +/** + * Performs "unstable" page accounting. This function balances the + * increment operations performed in osc_inc_unstable_pages. It is + * registered as the RPC request callback, and is executed when the + * bulk RPC is committed on the server. Thus at this point, the pages + * involved in the bulk transfer are no longer considered unstable. + * + * If this function is called, the request should have been committed + * or req:rq_unstable must have been set; it implies that the unstable + * statistic have been added. + */ +void osc_dec_unstable_pages(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + int page_count = desc->bd_iov_count; + long unstable_count; + + LASSERT(page_count >= 0); + dec_unstable_page_accounting(desc); + + unstable_count = atomic_long_sub_return(page_count, + &cli->cl_unstable_count); + LASSERT(unstable_count >= 0); + + unstable_count = atomic_long_sub_return(page_count, + &cli->cl_cache->ccc_unstable_nr); + LASSERT(unstable_count >= 0); + if (!unstable_count) + wake_up_all(&cli->cl_cache->ccc_unstable_waitq); + + if (osc_cache_too_much(cli)) + (void)ptlrpcd_queue_work(cli->cl_lru_work); +} + +/** + * "unstable" page accounting. See: osc_dec_unstable_pages. + */ +void osc_inc_unstable_pages(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + long page_count = desc->bd_iov_count; + + /* No unstable page tracking */ + if (!cli->cl_cache || !cli->cl_cache->ccc_unstable_check) + return; + + add_unstable_page_accounting(desc); + atomic_long_add(page_count, &cli->cl_unstable_count); + atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr); + + /* + * If the request has already been committed (i.e. brw_commit + * called via rq_commit_cb), we need to undo the unstable page + * increments we just performed because rq_commit_cb wont be + * called again. + */ + spin_lock(&req->rq_lock); + if (unlikely(req->rq_committed)) { + spin_unlock(&req->rq_lock); + + osc_dec_unstable_pages(req); + } else { + req->rq_unstable = 1; + spin_unlock(&req->rq_lock); + } +} + +/** + * Check if it piggybacks SOFT_SYNC flag to OST from this OSC. + * This function will be called by every BRW RPC so it's critical + * to make this function fast. + */ +bool osc_over_unstable_soft_limit(struct client_obd *cli) +{ + long unstable_nr, osc_unstable_count; + + /* Can't check cli->cl_unstable_count, therefore, no soft limit */ + if (!cli->cl_cache || !cli->cl_cache->ccc_unstable_check) + return false; + + osc_unstable_count = atomic_long_read(&cli->cl_unstable_count); + unstable_nr = atomic_long_read(&cli->cl_cache->ccc_unstable_nr); + + CDEBUG(D_CACHE, + "%s: cli: %p unstable pages: %lu, osc unstable pages: %lu\n", + cli->cl_import->imp_obd->obd_name, cli, + unstable_nr, osc_unstable_count); + + /* + * If the LRU slots are in shortage - 25% remaining AND this OSC + * has one full RPC window of unstable pages, it's a good chance + * to piggyback a SOFT_SYNC flag. + * Please notice that the OST won't take immediate response for the + * SOFT_SYNC request so active OSCs will have more chance to carry + * the flag, this is reasonable. + */ + return unstable_nr > cli->cl_cache->ccc_lru_max >> 2 && + osc_unstable_count > cli->cl_max_pages_per_rpc * + cli->cl_max_rpcs_in_flight; +} + /** @} osc */ diff --git a/drivers/staging/lustre/lustre/osc/osc_request.c b/drivers/staging/lustre/lustre/osc/osc_request.c index 536b868ff776..749781f022e2 100644 --- a/drivers/staging/lustre/lustre/osc/osc_request.c +++ b/drivers/staging/lustre/lustre/osc/osc_request.c @@ -41,6 +41,7 @@ #include "../include/lustre_ha.h" #include "../include/lprocfs_status.h" +#include "../include/lustre/lustre_ioctl.h" #include "../include/lustre_debug.h" #include "../include/lustre_param.h" #include "../include/lustre_fid.h" @@ -102,36 +103,6 @@ static void osc_release_ppga(struct brw_page **ppga, u32 count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -/* Pack OSC object metadata for disk storage (LE byte order). */ -static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, - struct lov_stripe_md *lsm) -{ - int lmm_size; - - lmm_size = sizeof(**lmmp); - if (!lmmp) - return lmm_size; - - if (*lmmp && !lsm) { - kfree(*lmmp); - *lmmp = NULL; - return 0; - } else if (unlikely(lsm && ostid_id(&lsm->lsm_oi) == 0)) { - return -EBADF; - } - - if (!*lmmp) { - *lmmp = kzalloc(lmm_size, GFP_NOFS); - if (!*lmmp) - return -ENOMEM; - } - - if (lsm) - ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi); - - return lmm_size; -} - /* Unpack OSC object metadata from disk storage (LE byte order). */ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_mds_md *lmm, int lmm_bytes) @@ -189,7 +160,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; else - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES; return lsm_size; } @@ -427,24 +398,16 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up, oinfo, rqset); } -static int osc_real_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, - struct obd_trans_info *oti) +static int osc_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct obd_trans_info *oti) { struct ptlrpc_request *req; struct ost_body *body; - struct lov_stripe_md *lsm; int rc; LASSERT(oa); - LASSERT(ea); - - lsm = *ea; - if (!lsm) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc < 0) - return rc; - } + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi))); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); if (!req) { @@ -490,21 +453,10 @@ static int osc_real_create(struct obd_export *exp, struct obdo *oa, oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; - /* XXX LOV STACKING: the lsm that is passed to us from LOV does not - * have valid lsm_oinfo data structs, so don't go touching that. - * This needs to be fixed in a big way. - */ - lsm->lsm_oi = oa->o_oi; - *ea = lsm; - - if (oti) { - oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); - - if (oa->o_valid & OBD_MD_FLCOOKIE) { - if (!oti->oti_logcookies) - oti_alloc_cookies(oti, 1); - *oti->oti_logcookies = oa->o_lcookie; - } + if (oti && oa->o_valid & OBD_MD_FLCOOKIE) { + if (!oti->oti_logcookies) + oti->oti_logcookies = &oti->oti_onecookie; + *oti->oti_logcookies = oa->o_lcookie; } CDEBUG(D_HA, "transno: %lld\n", @@ -512,8 +464,6 @@ static int osc_real_create(struct obd_export *exp, struct obdo *oa, out_req: ptlrpc_req_finished(req); out: - if (rc && !*ea) - obd_free_memmd(exp, &lsm); return rc; } @@ -649,7 +599,7 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, ostid_build_res_name(&oa->o_oi, &res_id); res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); - if (!res) + if (IS_ERR(res)) return 0; LDLM_RESOURCE_ADDREF(res); @@ -689,30 +639,6 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } -static int osc_create(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md **ea, - struct obd_trans_info *oti) -{ - int rc = 0; - - LASSERT(oa); - LASSERT(ea); - LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_RECREATE_OBJS) { - return osc_real_create(exp, oa, ea, oti); - } - - if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi))) - return osc_real_create(exp, oa, ea, oti); - - /* we should not get here anymore */ - LBUG(); - - return rc; -} - /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -725,8 +651,7 @@ static int osc_create(const struct lu_env *env, struct obd_export *exp, * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md *ea, - struct obd_trans_info *oti, struct obd_export *md_export) + struct obdo *oa, struct obd_trans_info *oti) { struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; @@ -794,42 +719,44 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, long writing_bytes) { - u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; + u32 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT; LASSERT(!(oa->o_valid & bits)); oa->o_valid |= bits; spin_lock(&cli->cl_loi_list_lock); - oa->o_dirty = cli->cl_dirty; - if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > - cli->cl_dirty_max)) { + oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT; + if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit > + cli->cl_dirty_max_pages)) { CERROR("dirty %lu - %lu > dirty_max %lu\n", - cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); + cli->cl_dirty_pages, cli->cl_dirty_transit, + cli->cl_dirty_max_pages); oa->o_undirty = 0; - } else if (unlikely(atomic_read(&obd_unstable_pages) + - atomic_read(&obd_dirty_pages) - - atomic_read(&obd_dirty_transit_pages) > - (long)(obd_max_dirty_pages + 1))) { + } else if (unlikely(atomic_long_read(&obd_dirty_pages) - + atomic_long_read(&obd_dirty_transit_pages) > + (obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", + CERROR("%s: dirty %ld + %ld > system dirty_max %lu\n", cli->cl_import->imp_obd->obd_name, - atomic_read(&obd_unstable_pages), - atomic_read(&obd_dirty_pages), - atomic_read(&obd_dirty_transit_pages), + atomic_long_read(&obd_dirty_pages), + atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; - } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { + } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages > + 0x7fffffff)) { CERROR("dirty %lu - dirty_max %lu too big???\n", - cli->cl_dirty, cli->cl_dirty_max); + cli->cl_dirty_pages, cli->cl_dirty_max_pages); oa->o_undirty = 0; } else { - long max_in_flight = (cli->cl_max_pages_per_rpc << - PAGE_SHIFT)* - (cli->cl_max_rpcs_in_flight + 1); - oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); + unsigned long max_in_flight; + + max_in_flight = (cli->cl_max_pages_per_rpc << PAGE_SHIFT) * + (cli->cl_max_rpcs_in_flight + 1); + oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_SHIFT, + max_in_flight); } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; @@ -1029,22 +956,24 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) { /* * ocd_grant is the total grant amount we're expect to hold: if we've - * been evicted, it's the new avail_grant amount, cl_dirty will drop - * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty. + * been evicted, it's the new avail_grant amount, cl_dirty_pages will + * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant + + * dirty. * * race is tolerable here: if we're evicted, but imp_state already - * left EVICTED state, then cl_dirty must be 0 already. + * left EVICTED state, then cl_dirty_pages must be 0 already. */ spin_lock(&cli->cl_loi_list_lock); if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) cli->cl_avail_grant = ocd->ocd_grant; else - cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty; + cli->cl_avail_grant = ocd->ocd_grant - + (cli->cl_dirty_pages << PAGE_SHIFT); if (cli->cl_avail_grant < 0) { CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, - ocd->ocd_grant, cli->cl_dirty); + ocd->ocd_grant, cli->cl_dirty_pages << PAGE_SHIFT); /* workaround for servers which do not have the patch from * LU-2679 */ @@ -1181,7 +1110,7 @@ static u32 osc_checksum_bulk(int nob, u32 pg_count, } while (nob > 0 && pg_count > 0) { - int count = pga[i]->count > nob ? nob : pga[i]->count; + unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; /* corrupt the data before we compute the checksum, to * simulate an OST->client data error @@ -1191,7 +1120,7 @@ static u32 osc_checksum_bulk(int nob, u32 pg_count, unsigned char *ptr = kmap(pga[i]->pg); int off = pga[i]->off & ~PAGE_MASK; - memcpy(ptr + off, "bad1", min(4, nob)); + memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); kunmap(pga[i]->pg); } cfs_crypto_hash_update_page(hdesc, pga[i]->pg, @@ -1335,11 +1264,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; - niobuf->len += pg->count; + niobuf->rnb_len += pg->count; } else { - niobuf->offset = pg->off; - niobuf->len = pg->count; - niobuf->flags = pg->flag; + niobuf->rnb_offset = pg->off; + niobuf->rnb_len = pg->count; + niobuf->rnb_flags = pg->flag; } pg_prev = pg; } @@ -1418,6 +1347,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, INIT_LIST_HEAD(&aa->aa_oaps); *reqp = req; + niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); + CDEBUG(D_RPCTRACE, "brw rpc %p - object " DOSTID " offset %lld<>%lld\n", + req, POSTID(&oa->o_oi), niobuf[0].rnb_offset, + niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len); + return 0; out: @@ -1463,7 +1397,8 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, POSTID(&oa->o_oi), pga[0]->off, - pga[page_count-1]->off + pga[page_count-1]->count - 1); + pga[page_count - 1]->off + + pga[page_count - 1]->count - 1); CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n", client_cksum, client_cksum_type, server_cksum, cksum_type, new_cksum); @@ -1565,7 +1500,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) char *router = ""; enum cksum_type cksum_type; - cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ? + cksum_type = cksum_type_unpack(body->oa.o_valid & + OBD_MD_FLFLAGS ? body->oa.o_flags : 0); client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga, OST_READ, @@ -1794,7 +1730,8 @@ static int brw_interpret(const struct lu_env *env, if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; - loff_t last_off = last->oap_count + last->oap_obj_off; + loff_t last_off = last->oap_count + last->oap_obj_off + + last->oap_page_off; /* Change file size if this is an out of quota or * direct IO write and it extends the file size @@ -1812,11 +1749,14 @@ static int brw_interpret(const struct lu_env *env, } if (valid != 0) - cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } kmem_cache_free(obdo_cachep, aa->aa_oa); + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) + osc_inc_unstable_pages(req); + list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 1, rc); @@ -1847,21 +1787,21 @@ static int brw_interpret(const struct lu_env *env, static void brw_commit(struct ptlrpc_request *req) { - spin_lock(&req->rq_lock); /* * If osc_inc_unstable_pages (via osc_extent_finish) races with * this called via the rq_commit_cb, I need to ensure * osc_dec_unstable_pages is still called. Otherwise unstable * pages may be leaked. */ - if (req->rq_unstable) { + spin_lock(&req->rq_lock); + if (unlikely(req->rq_unstable)) { + req->rq_unstable = 0; spin_unlock(&req->rq_lock); osc_dec_unstable_pages(req); - spin_lock(&req->rq_lock); } else { req->rq_committed = 1; + spin_unlock(&req->rq_lock); } - spin_unlock(&req->rq_lock); } /** @@ -1881,13 +1821,13 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_async_page *tmp; struct cl_req *clerq = NULL; enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; - struct ldlm_lock *lock = NULL; struct cl_req_attr *crattr = NULL; u64 starting_offset = OBD_OBJECT_EOF; u64 ending_offset = 0; int mpflag = 0; int mem_tight = 0; int page_count = 0; + bool soft_sync = false; int i; int rc; struct ost_body *body; @@ -1915,6 +1855,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } } + soft_sync = osc_over_unstable_soft_limit(cli); if (mem_tight) mpflag = cfs_memory_pressure_get_and_set(); @@ -1947,10 +1888,11 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, rc = PTR_ERR(clerq); goto out; } - lock = oap->oap_ldlm_lock; } if (mem_tight) oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + if (soft_sync) + oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", @@ -1964,10 +1906,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, LASSERT(clerq); crattr->cra_oa = oa; cl_req_attr_set(env, clerq, crattr, ~0ULL); - if (lock) { - oa->o_handle = lock->l_remote_handle; - oa->o_valid |= OBD_MD_FLHANDLE; - } rc = cl_req_prep(env, clerq); if (rc != 0) { @@ -1998,7 +1936,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); crattr->cra_oa = &body->oa; cl_req_attr_set(env, clerq, crattr, - OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); + OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME); lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); @@ -2044,7 +1982,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } spin_unlock(&cli->cl_loi_list_lock); - DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", + DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%dw in flight", page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); @@ -2116,27 +2054,6 @@ static int osc_set_data_with_check(struct lustre_handle *lockh, return set; } -/* find any ldlm lock of the inode in osc - * return 0 not find - * 1 find one - * < 0 error - */ -static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_iterator_t replace, void *data) -{ - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - int rc = 0; - - ostid_build_res_name(&lsm->lsm_oi, &res_id); - rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); - if (rc == LDLM_ITER_STOP) - return 1; - if (rc == LDLM_ITER_CONTINUE) - return 0; - return rc; -} - static int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, void *cookie, struct lustre_handle *lockh, enum ldlm_mode mode, @@ -2586,71 +2503,6 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, return rc; } -/* Retrieve object striping information. - * - * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating - * the maximum number of OST indices which will fit in the user buffer. - * lmm_magic must be LOV_MAGIC (we only use 1 slot here). - */ -static int osc_getstripe(struct lov_stripe_md *lsm, - struct lov_user_md __user *lump) -{ - /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */ - struct lov_user_md_v3 lum, *lumk; - struct lov_user_ost_data_v1 *lmm_objects; - int rc = 0, lum_size; - - if (!lsm) - return -ENODATA; - - /* we only need the header part from user space to get lmm_magic and - * lmm_stripe_count, (the header part is common to v1 and v3) - */ - lum_size = sizeof(struct lov_user_md_v1); - if (copy_from_user(&lum, lump, lum_size)) - return -EFAULT; - - if ((lum.lmm_magic != LOV_USER_MAGIC_V1) && - (lum.lmm_magic != LOV_USER_MAGIC_V3)) - return -EINVAL; - - /* lov_user_md_vX and lov_mds_md_vX must have the same size */ - LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1)); - LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3)); - LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0])); - - /* we can use lov_mds_md_size() to compute lum_size - * because lov_user_md_vX and lov_mds_md_vX have the same size - */ - if (lum.lmm_stripe_count > 0) { - lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic); - lumk = kzalloc(lum_size, GFP_NOFS); - if (!lumk) - return -ENOMEM; - - if (lum.lmm_magic == LOV_USER_MAGIC_V1) - lmm_objects = - &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); - else - lmm_objects = &(lumk->lmm_objects[0]); - lmm_objects->l_ost_oi = lsm->lsm_oi; - } else { - lum_size = lov_mds_md_size(0, lum.lmm_magic); - lumk = &lum; - } - - lumk->lmm_oi = lsm->lsm_oi; - lumk->lmm_stripe_count = 1; - - if (copy_to_user(lump, lumk, lum_size)) - rc = -EFAULT; - - if (lumk != &lum) - kfree(lumk); - - return rc; -} - static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { @@ -2664,57 +2516,6 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, return -EINVAL; } switch (cmd) { - case OBD_IOC_LOV_GET_CONFIG: { - char *buf; - struct lov_desc *desc; - struct obd_uuid uuid; - - buf = NULL; - len = 0; - if (obd_ioctl_getdata(&buf, &len, uarg)) { - err = -EINVAL; - goto out; - } - - data = (struct obd_ioctl_data *)buf; - - if (sizeof(*desc) > data->ioc_inllen1) { - obd_ioctl_freedata(buf, len); - err = -EINVAL; - goto out; - } - - if (data->ioc_inllen2 < sizeof(uuid)) { - obd_ioctl_freedata(buf, len); - err = -EINVAL; - goto out; - } - - desc = (struct lov_desc *)data->ioc_inlbuf1; - desc->ld_tgt_count = 1; - desc->ld_active_tgt_count = 1; - desc->ld_default_stripe_count = 1; - desc->ld_default_stripe_size = 0; - desc->ld_default_stripe_offset = 0; - desc->ld_pattern = 0; - memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid)); - - memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid)); - - err = copy_to_user(uarg, buf, len); - if (err) - err = -EFAULT; - obd_ioctl_freedata(buf, len); - goto out; - } - case LL_IOC_LOV_SETSTRIPE: - err = obd_alloc_memmd(exp, karg); - if (err > 0) - err = 0; - goto out; - case LL_IOC_LOV_GETSTRIPE: - err = osc_getstripe(karg, uarg); - goto out; case OBD_IOC_CLIENT_RECOVER: err = ptlrpc_recover_import(obd->u.cli.cl_import, data->ioc_inlbuf1, 0); @@ -2749,51 +2550,7 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, if (!vallen || !val) return -EFAULT; - if (KEY_IS(KEY_LOCK_TO_STRIPE)) { - __u32 *stripe = val; - *vallen = sizeof(*stripe); - *stripe = 0; - return 0; - } else if (KEY_IS(KEY_LAST_ID)) { - struct ptlrpc_request *req; - u64 *reply; - char *tmp; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_OST_GET_INFO_LAST_ID); - if (!req) - return -ENOMEM; - - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, - RCL_CLIENT, keylen); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); - if (rc) { - ptlrpc_request_free(req); - return rc; - } - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); - memcpy(tmp, key, keylen); - - req->rq_no_delay = 1; - req->rq_no_resend = 1; - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - goto out; - - reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); - if (!reply) { - rc = -EPROTO; - goto out; - } - - *((u64 *)val) = *reply; -out: - ptlrpc_req_finished(req); - return rc; - } else if (KEY_IS(KEY_FIEMAP)) { + if (KEY_IS(KEY_FIEMAP)) { struct ll_fiemap_info_key *fm_key = key; struct ldlm_res_id res_id; ldlm_policy_data_t policy; @@ -2931,11 +2688,11 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; - int nr = atomic_read(&cli->cl_lru_in_list) >> 1; - int target = *(int *)val; + long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1; + long target = *(long *)val; nr = osc_lru_shrink(env, cli, min(nr, target), true); - *(int *)val -= nr; + *(long *)val -= nr; return 0; } @@ -3014,8 +2771,9 @@ static int osc_reconnect(const struct lu_env *env, long lost_grant; spin_lock(&cli->cl_loi_list_lock); - data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: - 2 * cli_brw_size(obd); + data->ocd_grant = (cli->cl_avail_grant + + (cli->cl_dirty_pages << PAGE_SHIFT)) ?: + 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; spin_unlock(&cli->cl_loi_list_lock); @@ -3346,7 +3104,6 @@ static struct obd_ops osc_obd_ops = { .disconnect = osc_disconnect, .statfs = osc_statfs, .statfs_async = osc_statfs_async, - .packmd = osc_packmd, .unpackmd = osc_unpackmd, .create = osc_create, .destroy = osc_destroy, @@ -3354,7 +3111,6 @@ static struct obd_ops osc_obd_ops = { .getattr_async = osc_getattr_async, .setattr = osc_setattr, .setattr_async = osc_setattr_async, - .find_cbdata = osc_find_cbdata, .iocontrol = osc_iocontrol, .get_info = osc_get_info, .set_info_async = osc_set_info_async, |