OpenZFS 9689 - zfs range lock code should not be zpl-specific

The ZFS range locking code in zfs_rlock.c/h depends on ZPL-specific
data structures, specifically znode_t.  However, it's also used by
the ZVOL code, which uses a "dummy" znode_t to pass to the range
locking code.

We should clean this up so that the range locking code is generic
and can be used equally by ZPL and ZVOL, and also can be used by
future consumers that may need to run in userland (libzpool) as
well as the kernel.

Porting notes:
* Added missing sys/avl.h include to sys/zfs_rlock.h.
* Removed 'dbuf is within the locked range' ASSERTs from dmu_sync().
  This was needed because ztest does not yet use a locked_range_t.
* Removed "Approved by:" tag requirement from OpenZFS commit
  check to prevent needless warnings when integrating changes
  which has not been merged to illumos.
* Reverted free_list range lock changes which were originally
  needed to defer the cv_destroy() which was called immediately
  after cv_broadcast().  With d2733258 this should be safe but
  if not we may need to reintroduce this logic.
* Reverts: The following two commits were reverted and squashed in
  to this change in order to make it easier to apply OpenZFS 9689.
  - d88895a0, which removed the dummy znode from zvol_state
  - e3a07cd0, which updated ztest to use range locks
* Preserved optimized rangelock comparison function.  Preserved the
  rangelock free list.  The cv_destroy() function will block waiting
  for all processes in cv_wait() to be scheduled and drop their
  reference.  This is done to ensure it's safe to free the condition
  variable.  However, blocking while holding the rl->rl_lock mutex
  can result in a deadlock on Linux.  A free list is introduced to
  defer the cv_destroy() and kmem_free() until after the mutex is
  released.

Authored by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Brian Behlendorf <behlendorf1@llnl.gov>
Reviewed by: Serapheim Dimitropoulos <serapheim.dimitro@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Brad Lewis <brad.lewis@delphix.com>
Ported-by: Brian Behlendorf <behlendorf1@llnl.gov>

OpenZFS-issue: https://illumos.org/issues/9689
OpenZFS-commit: https://github.com/openzfs/openzfs/pull/680
External-issue: DLPX-58662
Closes #7980
This commit is contained in:
Matt Ahrens 2018-10-01 15:13:12 -07:00 committed by Brian Behlendorf
parent 50a343d85c
commit 5d43cc9a59
10 changed files with 484 additions and 595 deletions

View File

@ -104,7 +104,6 @@
#include <sys/zio.h>
#include <sys/zil.h>
#include <sys/zil_impl.h>
#include <sys/zfs_rlock.h>
#include <sys/vdev_impl.h>
#include <sys/vdev_file.h>
#include <sys/spa_impl.h>
@ -258,6 +257,17 @@ typedef struct bufwad {
uint64_t bw_data;
} bufwad_t;
/*
* It would be better to use a rangelock_t per object. Unfortunately
* the rangelock_t is not a drop-in replacement for rl_t, because we
* still need to map from object ID to rangelock_t.
*/
typedef enum {
RL_READER,
RL_WRITER,
RL_APPEND
} rl_type_t;
typedef struct rll {
void *rll_writer;
int rll_readers;
@ -265,10 +275,12 @@ typedef struct rll {
kcondvar_t rll_cv;
} rll_t;
typedef struct zll {
list_t z_list;
kmutex_t z_lock;
} zll_t;
typedef struct rl {
uint64_t rl_object;
uint64_t rl_offset;
uint64_t rl_size;
rll_t *rl_lock;
} rl_t;
#define ZTEST_RANGE_LOCKS 64
#define ZTEST_OBJECT_LOCKS 64
@ -301,7 +313,7 @@ typedef struct ztest_ds {
char zd_name[ZFS_MAX_DATASET_NAME_LEN];
kmutex_t zd_dirobj_lock;
rll_t zd_object_lock[ZTEST_OBJECT_LOCKS];
zll_t zd_range_lock[ZTEST_RANGE_LOCKS];
rll_t zd_range_lock[ZTEST_RANGE_LOCKS];
} ztest_ds_t;
/*
@ -1318,100 +1330,6 @@ ztest_dmu_objset_own(const char *name, dmu_objset_type_t type,
return (err);
}
/*
* Object and range lock mechanics
*/
typedef struct {
list_node_t z_lnode;
zfs_refcount_t z_refcnt;
uint64_t z_object;
zfs_rlock_t z_range_lock;
} ztest_znode_t;
typedef struct {
rl_t *z_rl;
ztest_znode_t *z_ztznode;
} ztest_zrl_t;
static ztest_znode_t *
ztest_znode_init(uint64_t object)
{
ztest_znode_t *zp = umem_alloc(sizeof (*zp), UMEM_NOFAIL);
list_link_init(&zp->z_lnode);
zfs_refcount_create(&zp->z_refcnt);
zp->z_object = object;
zfs_rlock_init(&zp->z_range_lock);
return (zp);
}
static void
ztest_znode_fini(ztest_znode_t *zp)
{
ASSERT(zfs_refcount_is_zero(&zp->z_refcnt));
zfs_rlock_destroy(&zp->z_range_lock);
zp->z_object = 0;
zfs_refcount_destroy(&zp->z_refcnt);
list_link_init(&zp->z_lnode);
umem_free(zp, sizeof (*zp));
}
static void
ztest_zll_init(zll_t *zll)
{
mutex_init(&zll->z_lock, NULL, MUTEX_DEFAULT, NULL);
list_create(&zll->z_list, sizeof (ztest_znode_t),
offsetof(ztest_znode_t, z_lnode));
}
static void
ztest_zll_destroy(zll_t *zll)
{
list_destroy(&zll->z_list);
mutex_destroy(&zll->z_lock);
}
#define RL_TAG "range_lock"
static ztest_znode_t *
ztest_znode_get(ztest_ds_t *zd, uint64_t object)
{
zll_t *zll = &zd->zd_range_lock[object & (ZTEST_OBJECT_LOCKS - 1)];
ztest_znode_t *zp = NULL;
mutex_enter(&zll->z_lock);
for (zp = list_head(&zll->z_list); (zp);
zp = list_next(&zll->z_list, zp)) {
if (zp->z_object == object) {
zfs_refcount_add(&zp->z_refcnt, RL_TAG);
break;
}
}
if (zp == NULL) {
zp = ztest_znode_init(object);
zfs_refcount_add(&zp->z_refcnt, RL_TAG);
list_insert_head(&zll->z_list, zp);
}
mutex_exit(&zll->z_lock);
return (zp);
}
static void
ztest_znode_put(ztest_ds_t *zd, ztest_znode_t *zp)
{
zll_t *zll = NULL;
ASSERT3U(zp->z_object, !=, 0);
zll = &zd->zd_range_lock[zp->z_object & (ZTEST_OBJECT_LOCKS - 1)];
mutex_enter(&zll->z_lock);
zfs_refcount_remove(&zp->z_refcnt, RL_TAG);
if (zfs_refcount_is_zero(&zp->z_refcnt)) {
list_remove(&zll->z_list, zp);
ztest_znode_fini(zp);
}
mutex_exit(&zll->z_lock);
}
static void
ztest_rll_init(rll_t *rll)
{
@ -1484,37 +1402,33 @@ ztest_object_unlock(ztest_ds_t *zd, uint64_t object)
ztest_rll_unlock(rll);
}
static ztest_zrl_t *
ztest_zrl_init(rl_t *rl, ztest_znode_t *zp)
{
ztest_zrl_t *zrl = umem_alloc(sizeof (*zrl), UMEM_NOFAIL);
zrl->z_rl = rl;
zrl->z_ztznode = zp;
return (zrl);
}
static void
ztest_zrl_fini(ztest_zrl_t *zrl)
{
umem_free(zrl, sizeof (*zrl));
}
static ztest_zrl_t *
static rl_t *
ztest_range_lock(ztest_ds_t *zd, uint64_t object, uint64_t offset,
uint64_t size, rl_type_t type)
{
ztest_znode_t *zp = ztest_znode_get(zd, object);
rl_t *rl = zfs_range_lock(&zp->z_range_lock, offset,
size, type);
return (ztest_zrl_init(rl, zp));
uint64_t hash = object ^ (offset % (ZTEST_RANGE_LOCKS + 1));
rll_t *rll = &zd->zd_range_lock[hash & (ZTEST_RANGE_LOCKS - 1)];
rl_t *rl;
rl = umem_alloc(sizeof (*rl), UMEM_NOFAIL);
rl->rl_object = object;
rl->rl_offset = offset;
rl->rl_size = size;
rl->rl_lock = rll;
ztest_rll_lock(rll, type);
return (rl);
}
static void
ztest_range_unlock(ztest_ds_t *zd, ztest_zrl_t *zrl)
ztest_range_unlock(rl_t *rl)
{
zfs_range_unlock(zrl->z_rl);
ztest_znode_put(zd, zrl->z_ztznode);
ztest_zrl_fini(zrl);
rll_t *rll = rl->rl_lock;
ztest_rll_unlock(rll);
umem_free(rl, sizeof (*rl));
}
static void
@ -1536,7 +1450,7 @@ ztest_zd_init(ztest_ds_t *zd, ztest_shared_ds_t *szd, objset_t *os)
ztest_rll_init(&zd->zd_object_lock[l]);
for (l = 0; l < ZTEST_RANGE_LOCKS; l++)
ztest_zll_init(&zd->zd_range_lock[l]);
ztest_rll_init(&zd->zd_range_lock[l]);
}
static void
@ -1551,7 +1465,7 @@ ztest_zd_fini(ztest_ds_t *zd)
ztest_rll_destroy(&zd->zd_object_lock[l]);
for (l = 0; l < ZTEST_RANGE_LOCKS; l++)
ztest_zll_destroy(&zd->zd_range_lock[l]);
ztest_rll_destroy(&zd->zd_range_lock[l]);
}
#define TXG_MIGHTWAIT (ztest_random(10) == 0 ? TXG_NOWAIT : TXG_WAIT)
@ -1967,7 +1881,7 @@ ztest_replay_write(void *arg1, void *arg2, boolean_t byteswap)
dmu_tx_t *tx;
dmu_buf_t *db;
arc_buf_t *abuf = NULL;
ztest_zrl_t *rl;
rl_t *rl;
if (byteswap)
byteswap_uint64_array(lr, sizeof (*lr));
@ -2016,7 +1930,7 @@ ztest_replay_write(void *arg1, void *arg2, boolean_t byteswap)
if (abuf != NULL)
dmu_return_arcbuf(abuf);
dmu_buf_rele(db, FTAG);
ztest_range_unlock(zd, rl);
ztest_range_unlock(rl);
ztest_object_unlock(zd, lr->lr_foid);
return (ENOSPC);
}
@ -2074,7 +1988,7 @@ ztest_replay_write(void *arg1, void *arg2, boolean_t byteswap)
dmu_tx_commit(tx);
ztest_range_unlock(zd, rl);
ztest_range_unlock(rl);
ztest_object_unlock(zd, lr->lr_foid);
return (0);
@ -2088,7 +2002,7 @@ ztest_replay_truncate(void *arg1, void *arg2, boolean_t byteswap)
objset_t *os = zd->zd_os;
dmu_tx_t *tx;
uint64_t txg;
ztest_zrl_t *rl;
rl_t *rl;
if (byteswap)
byteswap_uint64_array(lr, sizeof (*lr));
@ -2103,7 +2017,7 @@ ztest_replay_truncate(void *arg1, void *arg2, boolean_t byteswap)
txg = ztest_tx_assign(tx, TXG_WAIT, FTAG);
if (txg == 0) {
ztest_range_unlock(zd, rl);
ztest_range_unlock(rl);
ztest_object_unlock(zd, lr->lr_foid);
return (ENOSPC);
}
@ -2115,7 +2029,7 @@ ztest_replay_truncate(void *arg1, void *arg2, boolean_t byteswap)
dmu_tx_commit(tx);
ztest_range_unlock(zd, rl);
ztest_range_unlock(rl);
ztest_object_unlock(zd, lr->lr_foid);
return (0);
@ -2222,30 +2136,23 @@ zil_replay_func_t *ztest_replay_vector[TX_MAX_TYPE] = {
/*
* ZIL get_data callbacks
*/
typedef struct ztest_zgd_private {
ztest_ds_t *z_zd;
ztest_zrl_t *z_rl;
uint64_t z_object;
} ztest_zgd_private_t;
static void
ztest_get_done(zgd_t *zgd, int error)
{
ztest_zgd_private_t *zzp = zgd->zgd_private;
ztest_ds_t *zd = zzp->z_zd;
uint64_t object = zzp->z_object;
ztest_ds_t *zd = zgd->zgd_private;
uint64_t object = ((rl_t *)zgd->zgd_lr)->rl_object;
if (zgd->zgd_db)
dmu_buf_rele(zgd->zgd_db, zgd);
ztest_range_unlock(zd, zzp->z_rl);
ztest_range_unlock((rl_t *)zgd->zgd_lr);
ztest_object_unlock(zd, object);
if (error == 0 && zgd->zgd_bp)
zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
umem_free(zgd, sizeof (*zgd));
umem_free(zzp, sizeof (*zzp));
}
static int
@ -2263,7 +2170,6 @@ ztest_get_data(void *arg, lr_write_t *lr, char *buf, struct lwb *lwb,
dmu_buf_t *db;
zgd_t *zgd;
int error;
ztest_zgd_private_t *zgd_private;
ASSERT3P(lwb, !=, NULL);
ASSERT3P(zio, !=, NULL);
@ -2290,15 +2196,11 @@ ztest_get_data(void *arg, lr_write_t *lr, char *buf, struct lwb *lwb,
zgd = umem_zalloc(sizeof (*zgd), UMEM_NOFAIL);
zgd->zgd_lwb = lwb;
zgd_private = umem_zalloc(sizeof (ztest_zgd_private_t), UMEM_NOFAIL);
zgd_private->z_zd = zd;
zgd_private->z_object = object;
zgd->zgd_private = zgd_private;
zgd->zgd_private = zd;
if (buf != NULL) { /* immediate write */
zgd_private->z_rl = ztest_range_lock(zd, object, offset, size,
RL_READER);
zgd->zgd_rl = zgd_private->z_rl->z_rl;
zgd->zgd_lr = (struct locked_range *)ztest_range_lock(zd,
object, offset, size, RL_READER);
error = dmu_read(os, object, offset, size, buf,
DMU_READ_NO_PREFETCH);
@ -2312,9 +2214,8 @@ ztest_get_data(void *arg, lr_write_t *lr, char *buf, struct lwb *lwb,
offset = 0;
}
zgd_private->z_rl = ztest_range_lock(zd, object, offset, size,
RL_READER);
zgd->zgd_rl = zgd_private->z_rl->z_rl;
zgd->zgd_lr = (struct locked_range *)ztest_range_lock(zd,
object, offset, size, RL_READER);
error = dmu_buf_hold(os, object, offset, zgd, &db,
DMU_READ_NO_PREFETCH);
@ -2560,7 +2461,7 @@ ztest_prealloc(ztest_ds_t *zd, uint64_t object, uint64_t offset, uint64_t size)
objset_t *os = zd->zd_os;
dmu_tx_t *tx;
uint64_t txg;
ztest_zrl_t *rl;
rl_t *rl;
txg_wait_synced(dmu_objset_pool(os), 0);
@ -2581,7 +2482,7 @@ ztest_prealloc(ztest_ds_t *zd, uint64_t object, uint64_t offset, uint64_t size)
(void) dmu_free_long_range(os, object, offset, size);
}
ztest_range_unlock(zd, rl);
ztest_range_unlock(rl);
ztest_object_unlock(zd, object);
}

View File

@ -73,6 +73,7 @@ struct arc_buf;
struct zio_prop;
struct sa_handle;
struct dsl_crypto_params;
struct locked_range;
typedef struct objset objset_t;
typedef struct dmu_tx dmu_tx_t;
@ -1034,7 +1035,7 @@ typedef struct zgd {
struct lwb *zgd_lwb;
struct blkptr *zgd_bp;
dmu_buf_t *zgd_db;
struct rl *zgd_rl;
struct locked_range *zgd_lr;
void *zgd_private;
} zgd_t;

View File

@ -22,6 +22,9 @@
* Copyright 2006 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
/*
* Copyright (c) 2018 by Delphix. All rights reserved.
*/
#ifndef _SYS_FS_ZFS_RLOCK_H
#define _SYS_FS_ZFS_RLOCK_H
@ -30,85 +33,46 @@
extern "C" {
#endif
#include <sys/list.h>
#include <sys/avl.h>
#ifdef _KERNEL
#include <sys/condvar.h>
#else
#include <sys/zfs_context.h>
#endif
typedef enum {
RL_READER,
RL_WRITER,
RL_APPEND
} rl_type_t;
} rangelock_type_t;
typedef struct zfs_rlock {
kmutex_t zr_mutex; /* protects changes to zr_avl */
avl_tree_t zr_avl; /* avl tree of range locks */
uint64_t *zr_size; /* points to znode->z_size */
uint_t *zr_blksz; /* points to znode->z_blksz */
uint64_t *zr_max_blksz; /* points to zfsvfs->z_max_blksz */
} zfs_rlock_t;
struct locked_range;
typedef struct rl {
zfs_rlock_t *r_zrl;
avl_node_t r_node; /* avl node link */
uint64_t r_off; /* file range offset */
uint64_t r_len; /* file range length */
uint_t r_cnt; /* range reference count in tree */
rl_type_t r_type; /* range type */
kcondvar_t r_wr_cv; /* cv for waiting writers */
kcondvar_t r_rd_cv; /* cv for waiting readers */
uint8_t r_proxy; /* acting for original range */
uint8_t r_write_wanted; /* writer wants to lock this range */
uint8_t r_read_wanted; /* reader wants to lock this range */
list_node_t rl_node; /* used for deferred release */
} rl_t;
typedef void (rangelock_cb_t)(struct locked_range *, void *);
/*
* Lock a range (offset, length) as either shared (RL_READER)
* or exclusive (RL_WRITER or RL_APPEND). RL_APPEND is a special type that
* is converted to RL_WRITER that specified to lock from the start of the
* end of file. Returns the range lock structure.
*/
rl_t *zfs_range_lock(zfs_rlock_t *zrl, uint64_t off, uint64_t len,
rl_type_t type);
typedef struct rangelock {
avl_tree_t rl_tree; /* contains locked_range_t */
kmutex_t rl_lock;
rangelock_cb_t *rl_cb;
void *rl_arg;
} rangelock_t;
/* Unlock range and destroy range lock structure. */
void zfs_range_unlock(rl_t *rl);
typedef struct locked_range {
rangelock_t *lr_rangelock; /* rangelock that this lock applies to */
avl_node_t lr_node; /* avl node link */
uint64_t lr_offset; /* file range offset */
uint64_t lr_length; /* file range length */
uint_t lr_count; /* range reference count in tree */
rangelock_type_t lr_type; /* range type */
kcondvar_t lr_write_cv; /* cv for waiting writers */
kcondvar_t lr_read_cv; /* cv for waiting readers */
uint8_t lr_proxy; /* acting for original range */
uint8_t lr_write_wanted; /* writer wants to lock this range */
uint8_t lr_read_wanted; /* reader wants to lock this range */
} locked_range_t;
/*
* Reduce range locked as RW_WRITER from whole file to specified range.
* Asserts the whole file was previously locked.
*/
void zfs_range_reduce(rl_t *rl, uint64_t off, uint64_t len);
void rangelock_init(rangelock_t *, rangelock_cb_t *, void *);
void rangelock_fini(rangelock_t *);
/*
* AVL comparison function used to order range locks
* Locks are ordered on the start offset of the range.
*/
int zfs_range_compare(const void *arg1, const void *arg2);
static inline void
zfs_rlock_init(zfs_rlock_t *zrl)
{
mutex_init(&zrl->zr_mutex, NULL, MUTEX_DEFAULT, NULL);
avl_create(&zrl->zr_avl, zfs_range_compare,
sizeof (rl_t), offsetof(rl_t, r_node));
zrl->zr_size = NULL;
zrl->zr_blksz = NULL;
zrl->zr_max_blksz = NULL;
}
static inline void
zfs_rlock_destroy(zfs_rlock_t *zrl)
{
avl_destroy(&zrl->zr_avl);
mutex_destroy(&zrl->zr_mutex);
}
locked_range_t *rangelock_enter(rangelock_t *,
uint64_t, uint64_t, rangelock_type_t);
void rangelock_exit(locked_range_t *);
void rangelock_reduce(locked_range_t *, uint64_t, uint64_t);
#ifdef __cplusplus
}

View File

@ -20,7 +20,7 @@
*/
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2015 by Delphix. All rights reserved.
* Copyright (c) 2012, 2018 by Delphix. All rights reserved.
* Copyright 2016 Nexenta Systems, Inc. All rights reserved.
*/
@ -191,7 +191,7 @@ typedef struct znode {
krwlock_t z_parent_lock; /* parent lock for directories */
krwlock_t z_name_lock; /* "master" lock for dirent locks */
zfs_dirlock_t *z_dirlocks; /* directory entry lock list */
zfs_rlock_t z_range_lock; /* file range lock */
rangelock_t z_rangelock; /* file range locks */
uint8_t z_unlinked; /* file has been unlinked */
uint8_t z_atime_dirty; /* atime needs to be synced */
uint8_t z_zn_prefetch; /* Prefetch znodes? */

View File

@ -1924,11 +1924,6 @@ dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
ASSERT(pio != NULL);
ASSERT(txg != 0);
/* dbuf is within the locked range */
ASSERT3U(db->db.db_offset, >=, zgd->zgd_rl->r_off);
ASSERT3U(db->db.db_offset + db->db.db_size, <=,
zgd->zgd_rl->r_off + zgd->zgd_rl->r_len);
SET_BOOKMARK(&zb, ds->ds_object,
db->db.db_object, db->db_level, db->db_blkid);

View File

@ -23,7 +23,7 @@
* Use is subject to license terms.
*/
/*
* Copyright (c) 2012 by Delphix. All rights reserved.
* Copyright (c) 2012, 2018 by Delphix. All rights reserved.
*/
/*
@ -34,9 +34,9 @@
* Interface
* ---------
* Defined in zfs_rlock.h but essentially:
* rl = zfs_range_lock(zp, off, len, lock_type);
* zfs_range_unlock(rl);
* zfs_range_reduce(rl, off, len);
* lr = rangelock_enter(zp, off, len, lock_type);
* rangelock_reduce(lr, off, len); // optional
* rangelock_exit(lr);
*
* AVL tree
* --------
@ -46,9 +46,10 @@
*
* Common case
* -----------
* The (hopefully) usual case is of no overlaps or contention for
* locks. On entry to zfs_lock_range() a rl_t is allocated; the tree
* searched that finds no overlap, and *this* rl_t is placed in the tree.
* The (hopefully) usual case is of no overlaps or contention for locks. On
* entry to rangelock_enter(), a locked_range_t is allocated; the tree
* searched that finds no overlap, and *this* locked_range_t is placed in the
* tree.
*
* Overlaps/Reference counting/Proxy locks
* ---------------------------------------
@ -87,68 +88,85 @@
*
* Grow block handling
* -------------------
* ZFS supports multiple block sizes currently up to 128K. The smallest
* ZFS supports multiple block sizes, up to 16MB. The smallest
* block size is used for the file which is grown as needed. During this
* growth all other writers and readers must be excluded.
* So if the block size needs to be grown then the whole file is
* exclusively locked, then later the caller will reduce the lock
* range to just the range to be written using zfs_reduce_range.
* range to just the range to be written using rangelock_reduce().
*/
#include <sys/zfs_context.h>
#include <sys/zfs_rlock.h>
#include <sys/sysmacros.h>
/*
* AVL comparison function used to order range locks
* Locks are ordered on the start offset of the range.
*/
static int
rangelock_compare(const void *arg1, const void *arg2)
{
const locked_range_t *rl1 = (const locked_range_t *)arg1;
const locked_range_t *rl2 = (const locked_range_t *)arg2;
return (AVL_CMP(rl1->lr_offset, rl2->lr_offset));
}
/*
* The callback is invoked when acquiring a RL_WRITER or RL_APPEND lock.
* It must convert RL_APPEND to RL_WRITER (starting at the end of the file),
* and may increase the range that's locked for RL_WRITER.
*/
void
rangelock_init(rangelock_t *rl, rangelock_cb_t *cb, void *arg)
{
mutex_init(&rl->rl_lock, NULL, MUTEX_DEFAULT, NULL);
avl_create(&rl->rl_tree, rangelock_compare,
sizeof (locked_range_t), offsetof(locked_range_t, lr_node));
rl->rl_cb = cb;
rl->rl_arg = arg;
}
void
rangelock_fini(rangelock_t *rl)
{
mutex_destroy(&rl->rl_lock);
avl_destroy(&rl->rl_tree);
}
/*
* Check if a write lock can be grabbed, or wait and recheck until available.
*/
static void
zfs_range_lock_writer(zfs_rlock_t *zrl, rl_t *new)
rangelock_enter_writer(rangelock_t *rl, locked_range_t *new)
{
avl_tree_t *tree = &zrl->zr_avl;
rl_t *rl;
avl_tree_t *tree = &rl->rl_tree;
locked_range_t *lr;
avl_index_t where;
uint64_t end_size;
uint64_t off = new->r_off;
uint64_t len = new->r_len;
uint64_t orig_off = new->lr_offset;
uint64_t orig_len = new->lr_length;
rangelock_type_t orig_type = new->lr_type;
for (;;) {
/*
* Range locking is also used by zvol. However, for zvol, we
* don't need to append or grow blocksize, so skip that
* processing.
*
* Yes, this is ugly, and would be solved by not handling
* grow or append in range lock code. If that was done then
* we could make the range locking code generically available
* to other non-zfs consumers.
* Call callback which can modify new->r_off,len,type.
* Note, the callback is used by the ZPL to handle appending
* and changing blocksizes. It isn't needed for zvols.
*/
if (zrl->zr_size) { /* caller is ZPL */
/*
* If in append mode pick up the current end of file.
* This is done under z_range_lock to avoid races.
*/
if (new->r_type == RL_APPEND)
new->r_off = *zrl->zr_size;
/*
* If we need to grow the block size then grab the whole
* file range. This is also done under z_range_lock to
* avoid races.
*/
end_size = MAX(*zrl->zr_size, new->r_off + len);
if (end_size > *zrl->zr_blksz &&
(!ISP2(*zrl->zr_blksz) ||
*zrl->zr_blksz < *zrl->zr_max_blksz)) {
new->r_off = 0;
new->r_len = UINT64_MAX;
}
if (rl->rl_cb != NULL) {
rl->rl_cb(new, rl->rl_arg);
}
/*
* If the type was APPEND, the callback must convert it to
* WRITER.
*/
ASSERT3U(new->lr_type, ==, RL_WRITER);
/*
* First check for the usual case of no locks
*/
if (avl_numnodes(tree) == 0) {
new->r_type = RL_WRITER; /* convert to writer */
avl_add(tree, new);
return;
}
@ -156,31 +174,33 @@ zfs_range_lock_writer(zfs_rlock_t *zrl, rl_t *new)
/*
* Look for any locks in the range.
*/
rl = avl_find(tree, new, &where);
if (rl)
lr = avl_find(tree, new, &where);
if (lr != NULL)
goto wait; /* already locked at same offset */
rl = (rl_t *)avl_nearest(tree, where, AVL_AFTER);
if (rl && (rl->r_off < new->r_off + new->r_len))
lr = (locked_range_t *)avl_nearest(tree, where, AVL_AFTER);
if (lr != NULL &&
lr->lr_offset < new->lr_offset + new->lr_length)
goto wait;
rl = (rl_t *)avl_nearest(tree, where, AVL_BEFORE);
if (rl && rl->r_off + rl->r_len > new->r_off)
lr = (locked_range_t *)avl_nearest(tree, where, AVL_BEFORE);
if (lr != NULL &&
lr->lr_offset + lr->lr_length > new->lr_offset)
goto wait;
new->r_type = RL_WRITER; /* convert possible RL_APPEND */
avl_insert(tree, new, where);
return;
wait:
if (!rl->r_write_wanted) {
cv_init(&rl->r_wr_cv, NULL, CV_DEFAULT, NULL);
rl->r_write_wanted = B_TRUE;
if (!lr->lr_write_wanted) {
cv_init(&lr->lr_write_cv, NULL, CV_DEFAULT, NULL);
lr->lr_write_wanted = B_TRUE;
}
cv_wait(&rl->r_wr_cv, &zrl->zr_mutex);
cv_wait(&lr->lr_write_cv, &rl->rl_lock);
/* reset to original */
new->r_off = off;
new->r_len = len;
new->lr_offset = orig_off;
new->lr_length = orig_len;
new->lr_type = orig_type;
}
}
@ -188,29 +208,29 @@ wait:
* If this is an original (non-proxy) lock then replace it by
* a proxy and return the proxy.
*/
static rl_t *
zfs_range_proxify(avl_tree_t *tree, rl_t *rl)
static locked_range_t *
rangelock_proxify(avl_tree_t *tree, locked_range_t *lr)
{
rl_t *proxy;
locked_range_t *proxy;
if (rl->r_proxy)
return (rl); /* already a proxy */
if (lr->lr_proxy)
return (lr); /* already a proxy */
ASSERT3U(rl->r_cnt, ==, 1);
ASSERT(rl->r_write_wanted == B_FALSE);
ASSERT(rl->r_read_wanted == B_FALSE);
avl_remove(tree, rl);
rl->r_cnt = 0;
ASSERT3U(lr->lr_count, ==, 1);
ASSERT(lr->lr_write_wanted == B_FALSE);
ASSERT(lr->lr_read_wanted == B_FALSE);
avl_remove(tree, lr);
lr->lr_count = 0;
/* create a proxy range lock */
proxy = kmem_alloc(sizeof (rl_t), KM_SLEEP);
proxy->r_off = rl->r_off;
proxy->r_len = rl->r_len;
proxy->r_cnt = 1;
proxy->r_type = RL_READER;
proxy->r_proxy = B_TRUE;
proxy->r_write_wanted = B_FALSE;
proxy->r_read_wanted = B_FALSE;
proxy = kmem_alloc(sizeof (locked_range_t), KM_SLEEP);
proxy->lr_offset = lr->lr_offset;
proxy->lr_length = lr->lr_length;
proxy->lr_count = 1;
proxy->lr_type = RL_READER;
proxy->lr_proxy = B_TRUE;
proxy->lr_write_wanted = B_FALSE;
proxy->lr_read_wanted = B_FALSE;
avl_add(tree, proxy);
return (proxy);
@ -220,29 +240,27 @@ zfs_range_proxify(avl_tree_t *tree, rl_t *rl)
* Split the range lock at the supplied offset
* returning the *front* proxy.
*/
static rl_t *
zfs_range_split(avl_tree_t *tree, rl_t *rl, uint64_t off)
static locked_range_t *
rangelock_split(avl_tree_t *tree, locked_range_t *lr, uint64_t off)
{
rl_t *front, *rear;
ASSERT3U(rl->r_len, >, 1);
ASSERT3U(off, >, rl->r_off);
ASSERT3U(off, <, rl->r_off + rl->r_len);
ASSERT(rl->r_write_wanted == B_FALSE);
ASSERT(rl->r_read_wanted == B_FALSE);
ASSERT3U(lr->lr_length, >, 1);
ASSERT3U(off, >, lr->lr_offset);
ASSERT3U(off, <, lr->lr_offset + lr->lr_length);
ASSERT(lr->lr_write_wanted == B_FALSE);
ASSERT(lr->lr_read_wanted == B_FALSE);
/* create the rear proxy range lock */
rear = kmem_alloc(sizeof (rl_t), KM_SLEEP);
rear->r_off = off;
rear->r_len = rl->r_off + rl->r_len - off;
rear->r_cnt = rl->r_cnt;
rear->r_type = RL_READER;
rear->r_proxy = B_TRUE;
rear->r_write_wanted = B_FALSE;
rear->r_read_wanted = B_FALSE;
locked_range_t *rear = kmem_alloc(sizeof (locked_range_t), KM_SLEEP);
rear->lr_offset = off;
rear->lr_length = lr->lr_offset + lr->lr_length - off;
rear->lr_count = lr->lr_count;
rear->lr_type = RL_READER;
rear->lr_proxy = B_TRUE;
rear->lr_write_wanted = B_FALSE;
rear->lr_read_wanted = B_FALSE;
front = zfs_range_proxify(tree, rl);
front->r_len = off - rl->r_off;
locked_range_t *front = rangelock_proxify(tree, lr);
front->lr_length = off - lr->lr_offset;
avl_insert_here(tree, rear, front, AVL_AFTER);
return (front);
@ -252,28 +270,27 @@ zfs_range_split(avl_tree_t *tree, rl_t *rl, uint64_t off)
* Create and add a new proxy range lock for the supplied range.
*/
static void
zfs_range_new_proxy(avl_tree_t *tree, uint64_t off, uint64_t len)
rangelock_new_proxy(avl_tree_t *tree, uint64_t off, uint64_t len)
{
rl_t *rl;
ASSERT(len);
rl = kmem_alloc(sizeof (rl_t), KM_SLEEP);
rl->r_off = off;
rl->r_len = len;
rl->r_cnt = 1;
rl->r_type = RL_READER;
rl->r_proxy = B_TRUE;
rl->r_write_wanted = B_FALSE;
rl->r_read_wanted = B_FALSE;
avl_add(tree, rl);
ASSERT(len != 0);
locked_range_t *lr = kmem_alloc(sizeof (locked_range_t), KM_SLEEP);
lr->lr_offset = off;
lr->lr_length = len;
lr->lr_count = 1;
lr->lr_type = RL_READER;
lr->lr_proxy = B_TRUE;
lr->lr_write_wanted = B_FALSE;
lr->lr_read_wanted = B_FALSE;
avl_add(tree, lr);
}
static void
zfs_range_add_reader(avl_tree_t *tree, rl_t *new, rl_t *prev, avl_index_t where)
rangelock_add_reader(avl_tree_t *tree, locked_range_t *new,
locked_range_t *prev, avl_index_t where)
{
rl_t *next;
uint64_t off = new->r_off;
uint64_t len = new->r_len;
locked_range_t *next;
uint64_t off = new->lr_offset;
uint64_t len = new->lr_length;
/*
* prev arrives either:
@ -282,37 +299,37 @@ zfs_range_add_reader(avl_tree_t *tree, rl_t *new, rl_t *prev, avl_index_t where)
* range may overlap with the new range
* - null, if there were no ranges starting before the new one
*/
if (prev) {
if (prev->r_off + prev->r_len <= off) {
if (prev != NULL) {
if (prev->lr_offset + prev->lr_length <= off) {
prev = NULL;
} else if (prev->r_off != off) {
} else if (prev->lr_offset != off) {
/*
* convert to proxy if needed then
* split this entry and bump ref count
*/
prev = zfs_range_split(tree, prev, off);
prev = rangelock_split(tree, prev, off);
prev = AVL_NEXT(tree, prev); /* move to rear range */
}
}
ASSERT((prev == NULL) || (prev->r_off == off));
ASSERT((prev == NULL) || (prev->lr_offset == off));
if (prev)
if (prev != NULL)
next = prev;
else
next = (rl_t *)avl_nearest(tree, where, AVL_AFTER);
next = avl_nearest(tree, where, AVL_AFTER);
if (next == NULL || off + len <= next->r_off) {
if (next == NULL || off + len <= next->lr_offset) {
/* no overlaps, use the original new rl_t in the tree */
avl_insert(tree, new, where);
return;
}
if (off < next->r_off) {
if (off < next->lr_offset) {
/* Add a proxy for initial range before the overlap */
zfs_range_new_proxy(tree, off, next->r_off - off);
rangelock_new_proxy(tree, off, next->lr_offset - off);
}
new->r_cnt = 0; /* will use proxies in tree */
new->lr_count = 0; /* will use proxies in tree */
/*
* We now search forward through the ranges, until we go past the end
* of the new range. For each entry we make it a proxy if it
@ -320,47 +337,51 @@ zfs_range_add_reader(avl_tree_t *tree, rl_t *new, rl_t *prev, avl_index_t where)
* gaps between the ranges then we create a new proxy range.
*/
for (prev = NULL; next; prev = next, next = AVL_NEXT(tree, next)) {
if (off + len <= next->r_off)
if (off + len <= next->lr_offset)
break;
if (prev && prev->r_off + prev->r_len < next->r_off) {
if (prev != NULL && prev->lr_offset + prev->lr_length <
next->lr_offset) {
/* there's a gap */
ASSERT3U(next->r_off, >, prev->r_off + prev->r_len);
zfs_range_new_proxy(tree, prev->r_off + prev->r_len,
next->r_off - (prev->r_off + prev->r_len));
ASSERT3U(next->lr_offset, >,
prev->lr_offset + prev->lr_length);
rangelock_new_proxy(tree,
prev->lr_offset + prev->lr_length,
next->lr_offset -
(prev->lr_offset + prev->lr_length));
}
if (off + len == next->r_off + next->r_len) {
if (off + len == next->lr_offset + next->lr_length) {
/* exact overlap with end */
next = zfs_range_proxify(tree, next);
next->r_cnt++;
next = rangelock_proxify(tree, next);
next->lr_count++;
return;
}
if (off + len < next->r_off + next->r_len) {
if (off + len < next->lr_offset + next->lr_length) {
/* new range ends in the middle of this block */
next = zfs_range_split(tree, next, off + len);
next->r_cnt++;
next = rangelock_split(tree, next, off + len);
next->lr_count++;
return;
}
ASSERT3U(off + len, >, next->r_off + next->r_len);
next = zfs_range_proxify(tree, next);
next->r_cnt++;
ASSERT3U(off + len, >, next->lr_offset + next->lr_length);
next = rangelock_proxify(tree, next);
next->lr_count++;
}
/* Add the remaining end range. */
zfs_range_new_proxy(tree, prev->r_off + prev->r_len,
(off + len) - (prev->r_off + prev->r_len));
rangelock_new_proxy(tree, prev->lr_offset + prev->lr_length,
(off + len) - (prev->lr_offset + prev->lr_length));
}
/*
* Check if a reader lock can be grabbed, or wait and recheck until available.
*/
static void
zfs_range_lock_reader(zfs_rlock_t *zrl, rl_t *new)
rangelock_enter_reader(rangelock_t *rl, locked_range_t *new)
{
avl_tree_t *tree = &zrl->zr_avl;
rl_t *prev, *next;
avl_tree_t *tree = &rl->rl_tree;
locked_range_t *prev, *next;
avl_index_t where;
uint64_t off = new->r_off;
uint64_t len = new->r_len;
uint64_t off = new->lr_offset;
uint64_t len = new->lr_length;
/*
* Look for any writer locks in the range.
@ -368,21 +389,22 @@ zfs_range_lock_reader(zfs_rlock_t *zrl, rl_t *new)
retry:
prev = avl_find(tree, new, &where);
if (prev == NULL)
prev = (rl_t *)avl_nearest(tree, where, AVL_BEFORE);
prev = (locked_range_t *)avl_nearest(tree, where, AVL_BEFORE);
/*
* Check the previous range for a writer lock overlap.
*/
if (prev && (off < prev->r_off + prev->r_len)) {
if ((prev->r_type == RL_WRITER) || (prev->r_write_wanted)) {
if (!prev->r_read_wanted) {
cv_init(&prev->r_rd_cv, NULL, CV_DEFAULT, NULL);
prev->r_read_wanted = B_TRUE;
if (prev && (off < prev->lr_offset + prev->lr_length)) {
if ((prev->lr_type == RL_WRITER) || (prev->lr_write_wanted)) {
if (!prev->lr_read_wanted) {
cv_init(&prev->lr_read_cv,
NULL, CV_DEFAULT, NULL);
prev->lr_read_wanted = B_TRUE;
}
cv_wait(&prev->r_rd_cv, &zrl->zr_mutex);
cv_wait(&prev->lr_read_cv, &rl->rl_lock);
goto retry;
}
if (off + len < prev->r_off + prev->r_len)
if (off + len < prev->lr_offset + prev->lr_length)
goto got_lock;
}
@ -390,95 +412,97 @@ retry:
* Search through the following ranges to see if there's
* write lock any overlap.
*/
if (prev)
if (prev != NULL)
next = AVL_NEXT(tree, prev);
else
next = (rl_t *)avl_nearest(tree, where, AVL_AFTER);
for (; next; next = AVL_NEXT(tree, next)) {
if (off + len <= next->r_off)
next = (locked_range_t *)avl_nearest(tree, where, AVL_AFTER);
for (; next != NULL; next = AVL_NEXT(tree, next)) {
if (off + len <= next->lr_offset)
goto got_lock;
if ((next->r_type == RL_WRITER) || (next->r_write_wanted)) {
if (!next->r_read_wanted) {
cv_init(&next->r_rd_cv, NULL, CV_DEFAULT, NULL);
next->r_read_wanted = B_TRUE;
if ((next->lr_type == RL_WRITER) || (next->lr_write_wanted)) {
if (!next->lr_read_wanted) {
cv_init(&next->lr_read_cv,
NULL, CV_DEFAULT, NULL);
next->lr_read_wanted = B_TRUE;
}
cv_wait(&next->r_rd_cv, &zrl->zr_mutex);
cv_wait(&next->lr_read_cv, &rl->rl_lock);
goto retry;
}
if (off + len <= next->r_off + next->r_len)
if (off + len <= next->lr_offset + next->lr_length)
goto got_lock;
}
got_lock:
/*
* Add the read lock, which may involve splitting existing
* locks and bumping ref counts (r_cnt).
* locks and bumping ref counts (r_count).
*/
zfs_range_add_reader(tree, new, prev, where);
rangelock_add_reader(tree, new, prev, where);
}
/*
* Lock a range (offset, length) as either shared (RL_READER)
* or exclusive (RL_WRITER). Returns the range lock structure
* for later unlocking or reduce range (if entire file
* previously locked as RL_WRITER).
* Lock a range (offset, length) as either shared (RL_READER) or exclusive
* (RL_WRITER or RL_APPEND). If RL_APPEND is specified, rl_cb() will convert
* it to a RL_WRITER lock (with the offset at the end of the file). Returns
* the range lock structure for later unlocking (or reduce range if the
* entire file is locked as RL_WRITER).
*/
rl_t *
zfs_range_lock(zfs_rlock_t *zrl, uint64_t off, uint64_t len, rl_type_t type)
locked_range_t *
rangelock_enter(rangelock_t *rl, uint64_t off, uint64_t len,
rangelock_type_t type)
{
rl_t *new;
ASSERT(type == RL_READER || type == RL_WRITER || type == RL_APPEND);
new = kmem_alloc(sizeof (rl_t), KM_SLEEP);
new->r_zrl = zrl;
new->r_off = off;
locked_range_t *new = kmem_alloc(sizeof (locked_range_t), KM_SLEEP);
new->lr_rangelock = rl;
new->lr_offset = off;
if (len + off < off) /* overflow */
len = UINT64_MAX - off;
new->r_len = len;
new->r_cnt = 1; /* assume it's going to be in the tree */
new->r_type = type;
new->r_proxy = B_FALSE;
new->r_write_wanted = B_FALSE;
new->r_read_wanted = B_FALSE;
new->lr_length = len;
new->lr_count = 1; /* assume it's going to be in the tree */
new->lr_type = type;
new->lr_proxy = B_FALSE;
new->lr_write_wanted = B_FALSE;
new->lr_read_wanted = B_FALSE;
mutex_enter(&zrl->zr_mutex);
mutex_enter(&rl->rl_lock);
if (type == RL_READER) {
/*
* First check for the usual case of no locks
*/
if (avl_numnodes(&zrl->zr_avl) == 0)
avl_add(&zrl->zr_avl, new);
if (avl_numnodes(&rl->rl_tree) == 0)
avl_add(&rl->rl_tree, new);
else
zfs_range_lock_reader(zrl, new);
} else /* RL_WRITER or RL_APPEND */
zfs_range_lock_writer(zrl, new);
mutex_exit(&zrl->zr_mutex);
rangelock_enter_reader(rl, new);
} else
rangelock_enter_writer(rl, new); /* RL_WRITER or RL_APPEND */
mutex_exit(&rl->rl_lock);
return (new);
}
/*
* Safely free the locked_range_t.
*/
static void
zfs_range_free(void *arg)
rangelock_free(locked_range_t *lr)
{
rl_t *rl = arg;
if (lr->lr_write_wanted)
cv_destroy(&lr->lr_write_cv);
if (rl->r_write_wanted)
cv_destroy(&rl->r_wr_cv);
if (lr->lr_read_wanted)
cv_destroy(&lr->lr_read_cv);
if (rl->r_read_wanted)
cv_destroy(&rl->r_rd_cv);
kmem_free(rl, sizeof (rl_t));
kmem_free(lr, sizeof (locked_range_t));
}
/*
* Unlock a reader lock
*/
static void
zfs_range_unlock_reader(zfs_rlock_t *zrl, rl_t *remove, list_t *free_list)
rangelock_exit_reader(rangelock_t *rl, locked_range_t *remove,
list_t *free_list)
{
avl_tree_t *tree = &zrl->zr_avl;
rl_t *rl, *next = NULL;
avl_tree_t *tree = &rl->rl_tree;
uint64_t len;
/*
@ -488,53 +512,48 @@ zfs_range_unlock_reader(zfs_rlock_t *zrl, rl_t *remove, list_t *free_list)
* removed from the tree and replaced by proxies (one or
* more ranges mapping to the entire range).
*/
if (remove->r_cnt == 1) {
if (remove->lr_count == 1) {
avl_remove(tree, remove);
if (remove->r_write_wanted)
cv_broadcast(&remove->r_wr_cv);
if (remove->r_read_wanted)
cv_broadcast(&remove->r_rd_cv);
if (remove->lr_write_wanted)
cv_broadcast(&remove->lr_write_cv);
if (remove->lr_read_wanted)
cv_broadcast(&remove->lr_read_cv);
list_insert_tail(free_list, remove);
} else {
ASSERT0(remove->r_cnt);
ASSERT0(remove->r_write_wanted);
ASSERT0(remove->r_read_wanted);
ASSERT0(remove->lr_count);
ASSERT0(remove->lr_write_wanted);
ASSERT0(remove->lr_read_wanted);
/*
* Find start proxy representing this reader lock,
* then decrement ref count on all proxies
* that make up this range, freeing them as needed.
*/
rl = avl_find(tree, remove, NULL);
ASSERT(rl);
ASSERT(rl->r_cnt);
ASSERT(rl->r_type == RL_READER);
for (len = remove->r_len; len != 0; rl = next) {
len -= rl->r_len;
if (len) {
next = AVL_NEXT(tree, rl);
ASSERT(next);
ASSERT(rl->r_off + rl->r_len == next->r_off);
ASSERT(next->r_cnt);
ASSERT(next->r_type == RL_READER);
locked_range_t *lr = avl_find(tree, remove, NULL);
ASSERT3P(lr, !=, NULL);
ASSERT3U(lr->lr_count, !=, 0);
ASSERT3U(lr->lr_type, ==, RL_READER);
locked_range_t *next = NULL;
for (len = remove->lr_length; len != 0; lr = next) {
len -= lr->lr_length;
if (len != 0) {
next = AVL_NEXT(tree, lr);
ASSERT3P(next, !=, NULL);
ASSERT3U(lr->lr_offset + lr->lr_length, ==,
next->lr_offset);
ASSERT3U(next->lr_count, !=, 0);
ASSERT3U(next->lr_type, ==, RL_READER);
}
rl->r_cnt--;
if (rl->r_cnt == 0) {
avl_remove(tree, rl);
if (rl->r_write_wanted)
cv_broadcast(&rl->r_wr_cv);
if (rl->r_read_wanted)
cv_broadcast(&rl->r_rd_cv);
list_insert_tail(free_list, rl);
lr->lr_count--;
if (lr->lr_count == 0) {
avl_remove(tree, lr);
if (lr->lr_write_wanted)
cv_broadcast(&lr->lr_write_cv);
if (lr->lr_read_wanted)
cv_broadcast(&lr->lr_read_cv);
list_insert_tail(free_list, lr);
}
}
kmem_free(remove, sizeof (rl_t));
kmem_free(remove, sizeof (locked_range_t));
}
}
@ -542,91 +561,79 @@ zfs_range_unlock_reader(zfs_rlock_t *zrl, rl_t *remove, list_t *free_list)
* Unlock range and destroy range lock structure.
*/
void
zfs_range_unlock(rl_t *rl)
rangelock_exit(locked_range_t *lr)
{
zfs_rlock_t *zrl = rl->r_zrl;
rangelock_t *rl = lr->lr_rangelock;
list_t free_list;
rl_t *free_rl;
locked_range_t *free_lr;
ASSERT(rl->r_type == RL_WRITER || rl->r_type == RL_READER);
ASSERT(rl->r_cnt == 1 || rl->r_cnt == 0);
ASSERT(!rl->r_proxy);
list_create(&free_list, sizeof (rl_t), offsetof(rl_t, rl_node));
ASSERT(lr->lr_type == RL_WRITER || lr->lr_type == RL_READER);
ASSERT(lr->lr_count == 1 || lr->lr_count == 0);
ASSERT(!lr->lr_proxy);
mutex_enter(&zrl->zr_mutex);
if (rl->r_type == RL_WRITER) {
/*
* The free list is used to defer the cv_destroy() and
* subsequent kmem_free until after the mutex is dropped.
*/
list_create(&free_list, sizeof (locked_range_t),
offsetof(locked_range_t, lr_node));
mutex_enter(&rl->rl_lock);
if (lr->lr_type == RL_WRITER) {
/* writer locks can't be shared or split */
avl_remove(&zrl->zr_avl, rl);
if (rl->r_write_wanted)
cv_broadcast(&rl->r_wr_cv);
if (rl->r_read_wanted)
cv_broadcast(&rl->r_rd_cv);
list_insert_tail(&free_list, rl);
avl_remove(&rl->rl_tree, lr);
if (lr->lr_write_wanted)
cv_broadcast(&lr->lr_write_cv);
if (lr->lr_read_wanted)
cv_broadcast(&lr->lr_read_cv);
list_insert_tail(&free_list, lr);
} else {
/*
* lock may be shared, let zfs_range_unlock_reader()
* release the zp->z_range_lock lock and free the rl_t
* lock may be shared, let rangelock_exit_reader()
* release the lock and free the locked_range_t.
*/
zfs_range_unlock_reader(zrl, rl, &free_list);
rangelock_exit_reader(rl, lr, &free_list);
}
mutex_exit(&zrl->zr_mutex);
mutex_exit(&rl->rl_lock);
while ((free_rl = list_head(&free_list)) != NULL) {
list_remove(&free_list, free_rl);
zfs_range_free(free_rl);
}
while ((free_lr = list_remove_head(&free_list)) != NULL)
rangelock_free(free_lr);
list_destroy(&free_list);
}
/*
* Reduce range locked as RL_WRITER from whole file to specified range.
* Asserts the whole file is exclusivly locked and so there's only one
* Asserts the whole file is exclusively locked and so there's only one
* entry in the tree.
*/
void
zfs_range_reduce(rl_t *rl, uint64_t off, uint64_t len)
rangelock_reduce(locked_range_t *lr, uint64_t off, uint64_t len)
{
zfs_rlock_t *zrl = rl->r_zrl;
rangelock_t *rl = lr->lr_rangelock;
/* Ensure there are no other locks */
ASSERT(avl_numnodes(&zrl->zr_avl) == 1);
ASSERT(rl->r_off == 0);
ASSERT(rl->r_type == RL_WRITER);
ASSERT(!rl->r_proxy);
ASSERT3U(rl->r_len, ==, UINT64_MAX);
ASSERT3U(rl->r_cnt, ==, 1);
ASSERT3U(avl_numnodes(&rl->rl_tree), ==, 1);
ASSERT3U(lr->lr_offset, ==, 0);
ASSERT3U(lr->lr_type, ==, RL_WRITER);
ASSERT(!lr->lr_proxy);
ASSERT3U(lr->lr_length, ==, UINT64_MAX);
ASSERT3U(lr->lr_count, ==, 1);
mutex_enter(&zrl->zr_mutex);
rl->r_off = off;
rl->r_len = len;
if (rl->r_write_wanted)
cv_broadcast(&rl->r_wr_cv);
if (rl->r_read_wanted)
cv_broadcast(&rl->r_rd_cv);
mutex_exit(&zrl->zr_mutex);
mutex_enter(&rl->rl_lock);
lr->lr_offset = off;
lr->lr_length = len;
mutex_exit(&rl->rl_lock);
if (lr->lr_write_wanted)
cv_broadcast(&lr->lr_write_cv);
if (lr->lr_read_wanted)
cv_broadcast(&lr->lr_read_cv);
}
/*
* AVL comparison function used to order range locks
* Locks are ordered on the start offset of the range.
*/
int
zfs_range_compare(const void *arg1, const void *arg2)
{
const rl_t *rl1 = (const rl_t *)arg1;
const rl_t *rl2 = (const rl_t *)arg2;
return (AVL_CMP(rl1->r_off, rl2->r_off));
}
#ifdef _KERNEL
EXPORT_SYMBOL(zfs_range_lock);
EXPORT_SYMBOL(zfs_range_unlock);
EXPORT_SYMBOL(zfs_range_reduce);
EXPORT_SYMBOL(zfs_range_compare);
#if defined(_KERNEL)
EXPORT_SYMBOL(rangelock_init);
EXPORT_SYMBOL(rangelock_fini);
EXPORT_SYMBOL(rangelock_enter);
EXPORT_SYMBOL(rangelock_exit);
EXPORT_SYMBOL(rangelock_reduce);
#endif

View File

@ -477,7 +477,7 @@ zfs_read(struct inode *ip, uio_t *uio, int ioflag, cred_t *cr)
/*
* Lock the range against changes.
*/
rl_t *rl = zfs_range_lock(&zp->z_range_lock,
locked_range_t *lr = rangelock_enter(&zp->z_rangelock,
uio->uio_loffset, uio->uio_resid, RL_READER);
/*
@ -550,7 +550,7 @@ zfs_read(struct inode *ip, uio_t *uio, int ioflag, cred_t *cr)
dataset_kstats_update_read_kstats(&zfsvfs->z_kstat, nread);
task_io_account_read(nread);
out:
zfs_range_unlock(rl);
rangelock_exit(lr);
ZFS_EXIT(zfsvfs);
return (error);
@ -652,19 +652,18 @@ zfs_write(struct inode *ip, uio_t *uio, int ioflag, cred_t *cr)
#endif
uio_prefaultpages(MIN(n, max_blksz), uio);
rl_t *rl;
/*
* If in append mode, set the io offset pointer to eof.
*/
locked_range_t *lr;
if (ioflag & FAPPEND) {
/*
* Obtain an appending range lock to guarantee file append
* semantics. We reset the write offset once we have the lock.
*/
rl = zfs_range_lock(&zp->z_range_lock, 0, n, RL_APPEND);
woff = rl->r_off;
if (rl->r_len == UINT64_MAX) {
lr = rangelock_enter(&zp->z_rangelock, 0, n, RL_APPEND);
woff = lr->lr_offset;
if (lr->lr_length == UINT64_MAX) {
/*
* We overlocked the file because this write will cause
* the file block size to increase.
@ -679,11 +678,11 @@ zfs_write(struct inode *ip, uio_t *uio, int ioflag, cred_t *cr)
* this write, then this range lock will lock the entire file
* so that we can re-write the block safely.
*/
rl = zfs_range_lock(&zp->z_range_lock, woff, n, RL_WRITER);
lr = rangelock_enter(&zp->z_rangelock, woff, n, RL_WRITER);
}
if (woff >= limit) {
zfs_range_unlock(rl);
rangelock_exit(lr);
ZFS_EXIT(zfsvfs);
return (SET_ERROR(EFBIG));
}
@ -776,12 +775,12 @@ zfs_write(struct inode *ip, uio_t *uio, int ioflag, cred_t *cr)
}
/*
* If zfs_range_lock() over-locked we grow the blocksize
* If rangelock_enter() over-locked we grow the blocksize
* and then reduce the lock range. This will only happen
* on the first iteration since zfs_range_reduce() will
* shrink down r_len to the appropriate size.
* on the first iteration since rangelock_reduce() will
* shrink down lr_length to the appropriate size.
*/
if (rl->r_len == UINT64_MAX) {
if (lr->lr_length == UINT64_MAX) {
uint64_t new_blksz;
if (zp->z_blksz > max_blksz) {
@ -797,7 +796,7 @@ zfs_write(struct inode *ip, uio_t *uio, int ioflag, cred_t *cr)
new_blksz = MIN(end_size, max_blksz);
}
zfs_grow_blocksize(zp, new_blksz, tx);
zfs_range_reduce(rl, woff, n);
rangelock_reduce(lr, woff, n);
}
/*
@ -915,7 +914,7 @@ zfs_write(struct inode *ip, uio_t *uio, int ioflag, cred_t *cr)
}
zfs_inode_update(zp);
zfs_range_unlock(rl);
rangelock_exit(lr);
/*
* If we're in replay mode, or we made no progress, return error.
@ -967,7 +966,7 @@ zfs_get_done(zgd_t *zgd, int error)
if (zgd->zgd_db)
dmu_buf_rele(zgd->zgd_db, zgd);
zfs_range_unlock(zgd->zgd_rl);
rangelock_exit(zgd->zgd_lr);
/*
* Release the vnode asynchronously as we currently have the
@ -1031,8 +1030,8 @@ zfs_get_data(void *arg, lr_write_t *lr, char *buf, struct lwb *lwb, zio_t *zio)
* we don't have to write the data twice.
*/
if (buf != NULL) { /* immediate write */
zgd->zgd_rl = zfs_range_lock(&zp->z_range_lock, offset, size,
RL_READER);
zgd->zgd_lr = rangelock_enter(&zp->z_rangelock,
offset, size, RL_READER);
/* test for truncation needs to be done while range locked */
if (offset >= zp->z_size) {
error = SET_ERROR(ENOENT);
@ -1053,12 +1052,12 @@ zfs_get_data(void *arg, lr_write_t *lr, char *buf, struct lwb *lwb, zio_t *zio)
size = zp->z_blksz;
blkoff = ISP2(size) ? P2PHASE(offset, size) : offset;
offset -= blkoff;
zgd->zgd_rl = zfs_range_lock(&zp->z_range_lock, offset,
size, RL_READER);
zgd->zgd_lr = rangelock_enter(&zp->z_rangelock,
offset, size, RL_READER);
if (zp->z_blksz == size)
break;
offset += blkoff;
zfs_range_unlock(zgd->zgd_rl);
rangelock_exit(zgd->zgd_lr);
}
/* test for truncation needs to be done while range locked */
if (lr->lr_offset >= zp->z_size)
@ -4432,7 +4431,6 @@ zfs_putpage(struct inode *ip, struct page *pp, struct writeback_control *wbc)
loff_t offset;
loff_t pgoff;
unsigned int pglen;
rl_t *rl;
dmu_tx_t *tx;
caddr_t va;
int err = 0;
@ -4506,13 +4504,14 @@ zfs_putpage(struct inode *ip, struct page *pp, struct writeback_control *wbc)
redirty_page_for_writepage(wbc, pp);
unlock_page(pp);
rl = zfs_range_lock(&zp->z_range_lock, pgoff, pglen, RL_WRITER);
locked_range_t *lr = rangelock_enter(&zp->z_rangelock,
pgoff, pglen, RL_WRITER);
lock_page(pp);
/* Page mapping changed or it was no longer dirty, we're done */
if (unlikely((mapping != pp->mapping) || !PageDirty(pp))) {
unlock_page(pp);
zfs_range_unlock(rl);
rangelock_exit(lr);
ZFS_EXIT(zfsvfs);
return (0);
}
@ -4520,7 +4519,7 @@ zfs_putpage(struct inode *ip, struct page *pp, struct writeback_control *wbc)
/* Another process started write block if required */
if (PageWriteback(pp)) {
unlock_page(pp);
zfs_range_unlock(rl);
rangelock_exit(lr);
if (wbc->sync_mode != WB_SYNC_NONE)
wait_on_page_writeback(pp);
@ -4532,7 +4531,7 @@ zfs_putpage(struct inode *ip, struct page *pp, struct writeback_control *wbc)
/* Clear the dirty flag the required locks are held */
if (!clear_page_dirty_for_io(pp)) {
unlock_page(pp);
zfs_range_unlock(rl);
rangelock_exit(lr);
ZFS_EXIT(zfsvfs);
return (0);
}
@ -4559,7 +4558,7 @@ zfs_putpage(struct inode *ip, struct page *pp, struct writeback_control *wbc)
__set_page_dirty_nobuffers(pp);
ClearPageError(pp);
end_page_writeback(pp);
zfs_range_unlock(rl);
rangelock_exit(lr);
ZFS_EXIT(zfsvfs);
return (err);
}
@ -4586,7 +4585,7 @@ zfs_putpage(struct inode *ip, struct page *pp, struct writeback_control *wbc)
zfs_putpage_commit_cb, pp);
dmu_tx_commit(tx);
zfs_range_unlock(rl);
rangelock_exit(lr);
if (wbc->sync_mode != WB_SYNC_NONE) {
/*

View File

@ -20,7 +20,7 @@
*/
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2014 by Delphix. All rights reserved.
* Copyright (c) 2012, 2018 by Delphix. All rights reserved.
*/
/* Portions Copyright 2007 Jeremy Teo */
@ -91,6 +91,37 @@ static kmem_cache_t *znode_cache = NULL;
static kmem_cache_t *znode_hold_cache = NULL;
unsigned int zfs_object_mutex_size = ZFS_OBJ_MTX_SZ;
/*
* This callback is invoked when acquiring a RL_WRITER or RL_APPEND lock on
* z_rangelock. It will modify the offset and length of the lock to reflect
* znode-specific information, and convert RL_APPEND to RL_WRITER. This is
* called with the rangelock_t's rl_lock held, which avoids races.
*/
static void
zfs_rangelock_cb(locked_range_t *new, void *arg)
{
znode_t *zp = arg;
/*
* If in append mode, convert to writer and lock starting at the
* current end of file.
*/
if (new->lr_type == RL_APPEND) {
new->lr_offset = zp->z_size;
new->lr_type = RL_WRITER;
}
/*
* If we need to grow the block size then lock the whole file range.
*/
uint64_t end_size = MAX(zp->z_size, new->lr_offset + new->lr_length);
if (end_size > zp->z_blksz && (!ISP2(zp->z_blksz) ||
zp->z_blksz < ZTOZSB(zp)->z_max_blksz)) {
new->lr_offset = 0;
new->lr_length = UINT64_MAX;
}
}
/*ARGSUSED*/
static int
zfs_znode_cache_constructor(void *buf, void *arg, int kmflags)
@ -106,7 +137,7 @@ zfs_znode_cache_constructor(void *buf, void *arg, int kmflags)
mutex_init(&zp->z_acl_lock, NULL, MUTEX_DEFAULT, NULL);
rw_init(&zp->z_xattr_lock, NULL, RW_DEFAULT, NULL);
zfs_rlock_init(&zp->z_range_lock);
rangelock_init(&zp->z_rangelock, zfs_rangelock_cb, zp);
zp->z_dirlocks = NULL;
zp->z_acl_cached = NULL;
@ -128,7 +159,7 @@ zfs_znode_cache_destructor(void *buf, void *arg)
rw_destroy(&zp->z_name_lock);
mutex_destroy(&zp->z_acl_lock);
rw_destroy(&zp->z_xattr_lock);
zfs_rlock_destroy(&zp->z_range_lock);
rangelock_fini(&zp->z_rangelock);
ASSERT(zp->z_dirlocks == NULL);
ASSERT(zp->z_acl_cached == NULL);
@ -577,9 +608,6 @@ zfs_znode_alloc(zfsvfs_t *zfsvfs, dmu_buf_t *db, int blksz,
zp->z_is_mapped = B_FALSE;
zp->z_is_ctldir = B_FALSE;
zp->z_is_stale = B_FALSE;
zp->z_range_lock.zr_size = &zp->z_size;
zp->z_range_lock.zr_blksz = &zp->z_blksz;
zp->z_range_lock.zr_max_blksz = &ZTOZSB(zp)->z_max_blksz;
zfs_znode_sa_init(zfsvfs, zp, db, obj_type, hdl);
@ -1475,20 +1503,20 @@ zfs_extend(znode_t *zp, uint64_t end)
{
zfsvfs_t *zfsvfs = ZTOZSB(zp);
dmu_tx_t *tx;
rl_t *rl;
locked_range_t *lr;
uint64_t newblksz;
int error;
/*
* We will change zp_size, lock the whole file.
*/
rl = zfs_range_lock(&zp->z_range_lock, 0, UINT64_MAX, RL_WRITER);
lr = rangelock_enter(&zp->z_rangelock, 0, UINT64_MAX, RL_WRITER);
/*
* Nothing to do if file already at desired length.
*/
if (end <= zp->z_size) {
zfs_range_unlock(rl);
rangelock_exit(lr);
return (0);
}
tx = dmu_tx_create(zfsvfs->z_os);
@ -1518,7 +1546,7 @@ zfs_extend(znode_t *zp, uint64_t end)
error = dmu_tx_assign(tx, TXG_WAIT);
if (error) {
dmu_tx_abort(tx);
zfs_range_unlock(rl);
rangelock_exit(lr);
return (error);
}
@ -1530,7 +1558,7 @@ zfs_extend(znode_t *zp, uint64_t end)
VERIFY(0 == sa_update(zp->z_sa_hdl, SA_ZPL_SIZE(ZTOZSB(zp)),
&zp->z_size, sizeof (zp->z_size), tx));
zfs_range_unlock(rl);
rangelock_exit(lr);
dmu_tx_commit(tx);
@ -1593,19 +1621,19 @@ static int
zfs_free_range(znode_t *zp, uint64_t off, uint64_t len)
{
zfsvfs_t *zfsvfs = ZTOZSB(zp);
rl_t *rl;
locked_range_t *lr;
int error;
/*
* Lock the range being freed.
*/
rl = zfs_range_lock(&zp->z_range_lock, off, len, RL_WRITER);
lr = rangelock_enter(&zp->z_rangelock, off, len, RL_WRITER);
/*
* Nothing to do if file already at desired length.
*/
if (off >= zp->z_size) {
zfs_range_unlock(rl);
rangelock_exit(lr);
return (0);
}
@ -1655,7 +1683,7 @@ zfs_free_range(znode_t *zp, uint64_t off, uint64_t len)
page_len);
}
}
zfs_range_unlock(rl);
rangelock_exit(lr);
return (error);
}
@ -1673,7 +1701,7 @@ zfs_trunc(znode_t *zp, uint64_t end)
{
zfsvfs_t *zfsvfs = ZTOZSB(zp);
dmu_tx_t *tx;
rl_t *rl;
locked_range_t *lr;
int error;
sa_bulk_attr_t bulk[2];
int count = 0;
@ -1681,20 +1709,20 @@ zfs_trunc(znode_t *zp, uint64_t end)
/*
* We will change zp_size, lock the whole file.
*/
rl = zfs_range_lock(&zp->z_range_lock, 0, UINT64_MAX, RL_WRITER);
lr = rangelock_enter(&zp->z_rangelock, 0, UINT64_MAX, RL_WRITER);
/*
* Nothing to do if file already at desired length.
*/
if (end >= zp->z_size) {
zfs_range_unlock(rl);
rangelock_exit(lr);
return (0);
}
error = dmu_free_long_range(zfsvfs->z_os, zp->z_id, end,
DMU_OBJECT_END);
if (error) {
zfs_range_unlock(rl);
rangelock_exit(lr);
return (error);
}
tx = dmu_tx_create(zfsvfs->z_os);
@ -1704,7 +1732,7 @@ zfs_trunc(znode_t *zp, uint64_t end)
error = dmu_tx_assign(tx, TXG_WAIT);
if (error) {
dmu_tx_abort(tx);
zfs_range_unlock(rl);
rangelock_exit(lr);
return (error);
}
@ -1720,8 +1748,7 @@ zfs_trunc(znode_t *zp, uint64_t end)
VERIFY(sa_bulk_update(zp->z_sa_hdl, bulk, count, tx) == 0);
dmu_tx_commit(tx);
zfs_range_unlock(rl);
rangelock_exit(lr);
return (0);
}

View File

@ -86,7 +86,6 @@
#include <sys/dmu_tx.h>
#include <sys/zio.h>
#include <sys/zfs_rlock.h>
#include <sys/zfs_znode.h>
#include <sys/spa_impl.h>
#include <sys/zvol.h>
@ -123,7 +122,7 @@ struct zvol_state {
uint32_t zv_open_count; /* open counts */
uint32_t zv_changed; /* disk changed */
zilog_t *zv_zilog; /* ZIL handle */
zfs_rlock_t zv_range_lock; /* range lock */
rangelock_t zv_rangelock; /* for range locking */
dnode_t *zv_dn; /* dnode hold */
dev_t zv_dev; /* device id */
struct gendisk *zv_disk; /* generic disk */
@ -716,7 +715,7 @@ zvol_log_write(zvol_state_t *zv, dmu_tx_t *tx, uint64_t offset,
typedef struct zv_request {
zvol_state_t *zv;
struct bio *bio;
rl_t *rl;
locked_range_t *lr;
} zv_request_t;
static void
@ -778,7 +777,7 @@ zvol_write(void *arg)
if (error)
break;
}
zfs_range_unlock(zvr->rl);
rangelock_exit(zvr->lr);
int64_t nwritten = start_resid - uio.uio_resid;
dataset_kstats_update_write_kstats(&zv->zv_kstat, nwritten);
@ -872,7 +871,8 @@ zvol_discard(void *arg)
ZVOL_OBJ, start, size);
}
unlock:
zfs_range_unlock(zvr->rl);
rangelock_exit(zvr->lr);
if (error == 0 && sync)
zil_commit(zv->zv_zilog, ZVOL_OBJ);
@ -917,7 +917,7 @@ zvol_read(void *arg)
break;
}
}
zfs_range_unlock(zvr->rl);
rangelock_exit(zvr->lr);
int64_t nread = start_resid - uio.uio_resid;
dataset_kstats_update_read_kstats(&zv->zv_kstat, nread);
@ -985,7 +985,7 @@ zvol_request(struct request_queue *q, struct bio *bio)
* are asynchronous, we take it here synchronously to make
* sure overlapped I/Os are properly ordered.
*/
zvr->rl = zfs_range_lock(&zv->zv_range_lock, offset, size,
zvr->lr = rangelock_enter(&zv->zv_rangelock, offset, size,
RL_WRITER);
/*
* Sync writes and discards execute zil_commit() which may need
@ -1014,7 +1014,7 @@ zvol_request(struct request_queue *q, struct bio *bio)
rw_enter(&zv->zv_suspend_lock, RW_READER);
zvr->rl = zfs_range_lock(&zv->zv_range_lock, offset, size,
zvr->lr = rangelock_enter(&zv->zv_rangelock, offset, size,
RL_READER);
if (zvol_request_sync || taskq_dispatch(zvol_taskq,
zvol_read, zvr, TQ_SLEEP) == TASKQID_INVALID)
@ -1036,7 +1036,7 @@ zvol_get_done(zgd_t *zgd, int error)
if (zgd->zgd_db)
dmu_buf_rele(zgd->zgd_db, zgd);
zfs_range_unlock(zgd->zgd_rl);
rangelock_exit(zgd->zgd_lr);
if (error == 0 && zgd->zgd_bp)
zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
@ -1072,7 +1072,7 @@ zvol_get_data(void *arg, lr_write_t *lr, char *buf, struct lwb *lwb, zio_t *zio)
* we don't have to write the data twice.
*/
if (buf != NULL) { /* immediate write */
zgd->zgd_rl = zfs_range_lock(&zv->zv_range_lock, offset, size,
zgd->zgd_lr = rangelock_enter(&zv->zv_rangelock, offset, size,
RL_READER);
error = dmu_read_by_dnode(zv->zv_dn, offset, size, buf,
DMU_READ_NO_PREFETCH);
@ -1085,7 +1085,7 @@ zvol_get_data(void *arg, lr_write_t *lr, char *buf, struct lwb *lwb, zio_t *zio)
*/
size = zv->zv_volblocksize;
offset = P2ALIGN_TYPED(offset, size, uint64_t);
zgd->zgd_rl = zfs_range_lock(&zv->zv_range_lock, offset, size,
zgd->zgd_lr = rangelock_enter(&zv->zv_rangelock, offset, size,
RL_READER);
error = dmu_buf_hold_by_dnode(zv->zv_dn, offset, zgd, &db,
DMU_READ_NO_PREFETCH);
@ -1687,7 +1687,7 @@ zvol_alloc(dev_t dev, const char *name)
zv->zv_open_count = 0;
strlcpy(zv->zv_name, name, MAXNAMELEN);
zfs_rlock_init(&zv->zv_range_lock);
rangelock_init(&zv->zv_rangelock, NULL, NULL);
rw_init(&zv->zv_suspend_lock, NULL, RW_DEFAULT, NULL);
zv->zv_disk->major = zvol_major;
@ -1745,7 +1745,7 @@ zvol_free(void *arg)
ASSERT(zv->zv_disk->private_data == NULL);
rw_destroy(&zv->zv_suspend_lock);
zfs_rlock_destroy(&zv->zv_range_lock);
rangelock_fini(&zv->zv_rangelock);
del_gendisk(zv->zv_disk);
blk_cleanup_queue(zv->zv_queue);

View File

@ -121,11 +121,6 @@ function openzfs_port_commit()
error=1
fi
# need a approved by line
if ! check_tagged_line "Approved by" ; then
error=1
fi
# need ported by line
if ! check_tagged_line "Ported-by" ; then
error=1