redis缓存为什么要延时双删

技术博客 (244) 2023-12-14 09:01:02

一、只先删缓存

问题:先删缓存,在改库前,其他事务又把旧数据放到缓存里去了。

redis缓存为什么要延时双删 (https://mushiming.com/) 技术博客 第1张

二、只后删缓存

问题1:改了库,清理缓存前,有部分事务还是会拿到旧缓存

redis缓存为什么要延时双删 (https://mushiming.com/) 技术博客 第2张

问题2:为了避免缓存清除失败带来的风险,起到在每次操作数据库之前,都还原到没有产生缓存之前的效果。(当然如果能够做好回滚与配置好数据库隔离级别,这条可以忽略)
redis缓存为什么要延时双删 (https://mushiming.com/) 技术博客 第3张

三、普通双删

问题:第一次清空缓存后、更新数据库前:其他事务查询了数据库hang住
第二次清空缓存后:其他事务更新缓存,此时又会把旧数据更新到缓存

redis缓存为什么要延时双删 (https://mushiming.com/) 技术博客 第4张

四、为什么需要延时双删?

在三中,第二次清空缓存之前,多延时一会儿,等B更新缓存结束了,再删除缓存,这样就缓存就不存在了,其他事务查询到的为新缓存。

延时是确保 修改数据库 -> 清空缓存前,其他事务的更改缓存操作已经执行完。
redis缓存为什么要延时双删 (https://mushiming.com/) 技术博客 第5张

五、以上策略还能不能完善

四中说到,采用延时删最后一次缓存,但这其中难免还是会大量的查询到旧缓存数据的。
redis缓存为什么要延时双删 (https://mushiming.com/) 技术博客 第6张
这时候可以通过加锁来解决,一次性不让太多的线程都来请求,另外从图上看,我们可以尽量缩短第一次删除缓存更新数据库的时间差,这样可以使得其他事务第一时间获取到更新数据库后的数据。另外,该方式(第5种,相对第2种,只后删缓存的,可以大大的减少获取到旧缓存的数量)

可以参考:缓存处理流程

参考redis缓存为什么要延时双删

更细粒度的(Key级别)互斥锁

分段锁使用注意事项

分段锁

/** * * @Explain: key锁(要保证key的hashCode不变,否则无法释放锁。即加锁之后不要手动更改lockMap) */
@Component
public class LoadKeyLock<T> { 
   
    //默认分段数量
    private Integer segments = 16;
    private final HashMap<Integer, ReentrantLock> lockMap = new HashMap<>();
    public LoadKeyLock() { 
   
        init(null, false);
    }
    public LoadKeyLock(Integer counts, boolean fair) { 
   
        init(counts, fair);
    }
    private void init(Integer counts, boolean fair) { 
   
        if (counts != null) { 
   
            segments = counts;
        }
        for (int i = 0; i < segments; i++) { 
   
            lockMap.put(i, new ReentrantLock(fair));
        }
    }
    public void lock(T key) { 
   
        ReentrantLock lock = lockMap.get(key.hashCode() % segments);
        lock.lock();
    }
    public void unlock(T key) { 
   
        ReentrantLock lock = lockMap.get(key.hashCode() % segments);
        lock.unlock();
    }
}

哈希锁

分段锁的基础上发展起来的第二种锁策略,目的是实现真正意义上的细粒度锁。每个哈希值不同的对象都能获得自己独立的锁。

public class HashLock<T> { 
   
    private boolean isFair = false;
    private final SegmentLock<T> segmentLock = new SegmentLock<>();//分段锁
    private final ConcurrentHashMap<T, LockInfo> lockMap = new ConcurrentHashMap<>();

    public HashLock() { 
   
    }

    public HashLock(boolean fair) { 
   
        isFair = fair;
    }

    public void lock(T key) { 
   
        LockInfo lockInfo;
        segmentLock.lock(key);
        try { 
   
            lockInfo = lockMap.get(key);
            if (lockInfo == null) { 
   
                lockInfo = new LockInfo(isFair);
                lockMap.put(key, lockInfo);
            } else { 
   
                lockInfo.count.incrementAndGet();
            }
        } finally { 
   
            segmentLock.unlock(key);
        }
        lockInfo.lock.lock();
    }

    public void unlock(T key) { 
   
        LockInfo lockInfo = lockMap.get(key);
        if (lockInfo.count.get() == 1) { 
   
            segmentLock.lock(key);
            try { 
   
                if (lockInfo.count.get() == 1) { 
   
                    lockMap.remove(key);
                }
            } finally { 
   
                segmentLock.unlock(key);
            }
        }
        lockInfo.count.decrementAndGet();
        lockInfo.unlock();
    }

    private static class LockInfo { 
   
        public ReentrantLock lock;
        public AtomicInteger count = new AtomicInteger(1);

        private LockInfo(boolean fair) { 
   
            this.lock = new ReentrantLock(fair);
        }

        public void lock() { 
   
            this.lock.lock();
        }

        public void unlock() { 
   
            this.lock.unlock();
        }
    }
}

弱引用锁

哈希锁因为引入的分段锁来保证锁创建和销毁的同步,这个锁的思想是借助java的弱引用来创建锁,把锁的销毁交给jvm的垃圾回收,来避免额外的消耗。

/** * 弱引用锁,为每个独立的哈希值提供独立的锁功能 */
public class WeakHashLock<T> { 
   
    private ConcurrentHashMap<T, WeakLockRef<T, ReentrantLock>> lockMap = new ConcurrentHashMap<>();
    private ReferenceQueue<ReentrantLock> queue = new ReferenceQueue<>();

    public ReentrantLock get(T key) { 
   
        if (lockMap.size() > 1000) { 
   
            clearEmptyRef();
        }
        WeakReference<ReentrantLock> lockRef = lockMap.get(key);
        ReentrantLock lock = (lockRef == null ? null : lockRef.get());
        while (lock == null) { 
   
            lockMap.putIfAbsent(key, new WeakLockRef<>(new ReentrantLock(), queue, key));
            lockRef = lockMap.get(key);
            lock = (lockRef == null ? null : lockRef.get());
            if (lock != null) { 
   
                return lock;
            }
            clearEmptyRef();
        }
        return lock;
    }

    @SuppressWarnings("unchecked")
    private void clearEmptyRef() { 
   
        Reference<? extends ReentrantLock> ref;
        while ((ref = queue.poll()) != null) { 
   
            WeakLockRef<T, ? extends ReentrantLock> weakLockRef = (WeakLockRef<T, ? extends ReentrantLock>) ref;
            lockMap.remove(weakLockRef.key);
        }
    }

    private static final class WeakLockRef<T, K> extends WeakReference<K> { 
   
        final T key;

        private WeakLockRef(K referent, ReferenceQueue<? super K> q, T key) { 
   
            super(referent, q);
            this.key = key;
        }
    }
}

适合耗时长场景的互斥key锁

一个细粒度的锁,在某些场景能比synchronized,ReentrantLock等获得更高的并行度更好的性能

public class KeyLock<K> { 
   
    // 保存所有锁定的KEY及其信号量
    private final ConcurrentMap<K, Semaphore> map = new ConcurrentHashMap<K, Semaphore>();
    // 保存每个线程锁定的KEY及其锁定计数
    private final ThreadLocal<Map<K, LockInfo>> local = new ThreadLocal<Map<K, LockInfo>>() { 
   
        @Override
        protected Map<K, LockInfo> initialValue() { 
   
            return new HashMap<K, LockInfo>();
        }
    };

    /** * 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(K)} * 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashCode()}和 * {@link #equals(Object)}方法 * * @param key */
    public void lock(K key) { 
   
        if (key == null)
            return;
        LockInfo info = local.get().get(key);
        if (info == null) { 
   
            Semaphore current = new Semaphore(1);
            current.acquireUninterruptibly();
            Semaphore previous = map.put(key, current);
            if (previous != null)
                previous.acquireUninterruptibly();
            local.get().put(key, new LockInfo(current));
        } else { 
   
            info.lockCount++;
        }
    }
    /** * 释放key,唤醒其他等待此key的线程 * @param key */
    public void unlock(K key) { 
   
        if (key == null)
            return;
        LockInfo info = local.get().get(key);
        if (info != null && --info.lockCount == 0) { 
   
            info.current.release();
            map.remove(key, info.current);
            local.get().remove(key);
        }
    }
 
    /** * 锁定多个key * 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生 * @param keys */
    public void lock(K[] keys) { 
   
        if (keys == null)
            return;
        for (K key : keys) { 
   
            lock(key);
        }
    }

    /** * 释放多个key * @param keys */
    public void unlock(K[] keys) { 
   
        if (keys == null)
            return;
        for (K key : keys) { 
   
            unlock(key);
        }
    }

    private static class LockInfo { 
   
        private final Semaphore current;
        private int lockCount;

        private LockInfo(Semaphore current) { 
   
            this.current = current;
            this.lockCount = 1;
        }
    }

这里再推荐一篇:缓存处理流程

THE END

发表回复