事务
数据库事务是数据库管理系统执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成。
一个数据库事务通常包含了一个序列的对数据库的读/写操作。它的存在包含有以下两个目的:
为数据库操作序列提供了一个从失败中恢复到正常状态的方法,同时提供了数据库即使在异常状态下仍能保持一致性的方法。
当多个应用程序在并发访问数据库时,可以在这些应用程序之间提供一个隔离方法,以防止彼此的操作互相干扰。
当事务被提交给了DBMS(数据库管理系统),则DBMS(数据库管理系统)需要确保该事务中的所有操作都成功完成且其结果被永久保存在数据库中,如果事务中有的操作没有成功完成,则事务中的所有操作都需要被回滚,回到事务执行前的状态;同时,该事务对数据库或者其他事务的执行无影响,所有的事务都好像在独立的运行。
但在现实的情况下,失败的风险很高。在一个数据库事务的执行过程中,有可能会遇上事务操作失败、数据库系统/操作系统失败,甚至是存储介质失败等等情况。这便需要DBMS对一个执行失败的事务执行恢复操作,将其数据库状态恢复到一致状态(数据的一致性得到保证的状态)。为了实现将数据库状态恢复到一致状态的功能,DBMS通常需要维护事务日志以追踪事务中所有影响数据库数据的操作。
锁
当并发事务同时访问一个资源时,有可能导致数据不一致,因此需要一种机制来将数据访问顺序化,以保证数据库数据的一致性。锁就是其中的一种机制。
https://blog.csdn.net/u011277123/article/details/56281784
同步
https://segmentfault.com/a/1190000006049612
在上一章中,我们学到了如何通过执行器服务同时执行代码。当我们编写这种多线程代码时,我们需要特别注意共享可变变量的并发访问。假设我们打算增加某个可被多个线程同时访问的整数。
我们定义了count
字段,带有increment()
方法来使count
加一:
int count = 0;
void increment() {
count = count + 1;
}
当多个线程并发调用这个方法时,我们就会遇到大麻烦:
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i ->
executor.submit(this::increment));
stop(executor);
System.out.println(count); // 9965
我们没有看到
count
为10000的结果,上面代码的实际结果在每次执行时都不同。原因是我们在不同的线程上共享可变变量,并且变量访问没有同步机制,这会产生
增加一个数值需要三个步骤:(1)读取当前值,(2)使这个值加一,(3)将新的值写到变量。如果两个线程同时执行,就有可能出现两个线程同时执行步骤1,于是会读到相同的当前值。这会导致无效的写入,所以实际的结果会偏小。上面的例子中,对count
的非同步并发访问丢失了35次增加操作,但是你在自己执行代码时会看到不同的结果。幸运的是,Java自从很久之前就通过
synchronized
关键字支持线程同步。我们可以使用synchronized
来修复上面在增加count
时的竞争条件。
synchronized void incrementSync() {
count = count + 1;
}
在我们并发调用incrementSync()
时,我们得到了count
为10000的预期结果。没有再出现任何竞争条件,并且结果在每次代码执行中都很稳定:
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i ->
executor.submit(this::incrementSync));
stop(executor);
System.out.println(count); // 10000
synchronized关键字也可用于语句块:
void incrementSync() {
synchronized (this) {
count = count + 1;
}
}
Java在内部使用所谓的“监视器”(monitor),也称为监视器锁(monitor lock)或内在锁( intrinsic lock)来管理同步。监视器绑定在对象上,例如,当使用同步方法时,每个方法都共享相应对象的相同监视器。
所有隐式的监视器都实现了重入(reentrant)特性。重入的意思是锁绑定在当前线程上。线程可以安全地多次获取相同的锁,而不会产生死锁(例如,同步方法调用相同对象的另一个同步方法)。
锁
并发API支持多种显式的锁,它们由Lock
接口规定,用于代替synchronized
的隐式锁。锁对细粒度的控制支持多种方法,因此它们比隐式的监视器具有更大的开销。
锁的多个实现在标准JDK中提供,它们会在下面的章节中展示。
ReentrantLock
ReentrantLock
类是互斥锁,与通过synchronized
访问的隐式监视器具有相同行为,但是具有扩展功能。就像它的名称一样,这个锁实现了重入特性,就像隐式监视器一样。
让我们看看使用ReentrantLock
之后的上面的例子。
package com.huang.lock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author elephant-huang-zhao
* @date 2017/11/29
*/
public class ReentrantLockTest {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ReentrantLock lock = new ReentrantLock();
poolExecutor.submit(()->{
lock.lock();
try{
sleep();
}finally {
lock.unlock();
}
});
poolExecutor.submit(()->{
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("locked: "+lock.isLocked());
System.out.println("faired: "+lock.isFair());
System.out.println("Held by me: "+lock.isHeldByCurrentThread());
boolean locked = lock.tryLock();
System.out.println("Locked acquired: "+locked);
});
}
private static void sleep() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
tryLock()
方法是
lock()
方法的替代,它尝试拿锁而不阻塞当前线程。在访问任何共享可变变量之前,必须使用布尔值结果来检查锁是否已经被获取。
ReadWriteLock
ReadWriteLock
接口规定了锁的另一种类型,包含用于读写访问的一对锁。读写锁的理念是,只要没有任何线程写入变量,并发读取可变变量通常是安全的。所以读锁可以同时被多个线程持有,只要没有线程持有写锁。这样可以提升性能和吞吐量,因为读取比写入更加频繁。
package com.huang.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author elephant-huang-zhao
* @date 2017/11/29
*/
public class ReadWriteLockTest {
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
Map<String,String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
Runnable wirteTask = () ->{
lock.writeLock().lock();
try {
sleep(5);
map.put("foo", "bar");
} finally {
lock.writeLock().unlock();
}
};
poolExecutor.submit(wirteTask);
Runnable readTask = () ->{
lock.readLock().lock();
try {
System.out.println(map.get("foo") + System.currentTimeMillis());
sleep(5);
} finally {
lock.readLock().unlock();
}
};
poolExecutor.submit(readTask);
poolExecutor.submit(readTask);
//stop();
}
private static void sleep(long time) {
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void stop() {
try {
System.out.println("attempt to shutdown executor");
poolExecutor.shutdown();
poolExecutor.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.err.println("tasks interrupted");
poolExecutor.shutdownNow();
} finally {
if (!poolExecutor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
poolExecutor.shutdownNow();
System.out.println("shutdown finished");
}
}
}
StampedLock
Java 8 自带了一种新的锁,叫做StampedLock
,它同样支持读写锁,就像上面的例子那样。与ReadWriteLock
不同的是,StampedLock
的锁方法会返回表示为long
的标记。你可以使用这些标记来释放锁,或者检查锁是否有效。此外,StampedLock
支持另一种叫做乐观锁(optimistic locking)的模式。
让我们使用StampedLock
代替ReadWriteLock
重写上面的例子:
package com.huang.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
/**
* @author elephant-huang-zhao
* @date 2017/11/30
*/
public class StampedLockTest {
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
Map<String,String> map = new HashMap<>();
StampedLock stampedLock = new StampedLock();
poolExecutor.submit(() -> {
long stamp = stampedLock.writeLock();
try {
sleep(5);
map.put("foo", "bar");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockWrite(stamp);
}
});
Runnable readTask = () ->{
long stamp = stampedLock.readLock();
try {
System.out.println(map.get("foo"));
sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamp);
}
};
poolExecutor.submit(readTask);
}
private static void sleep(long time) throws InterruptedException {
TimeUnit.SECONDS.sleep(time);
}
}
通过
readLock()
或
writeLock()
来获取读锁或写锁会返回一个标记,它可以在稍后用于在
finally
块中解锁。要记住
StampedLock
并没有实现重入特性。每次调用加锁都会返回一个新的标记,并且在没有可用的锁时阻塞,即使相同线程已经拿锁了。所以你需要额外注意不要出现死锁。
就像前面的
ReadWriteLock
例子那样,两个读任务都需要等待写锁释放。之后两个读任务同时向控制台打印信息,因为多个读操作不会相互阻塞,只要没有线程拿到写锁。
下面的例子展示了乐观锁:
package com.huang.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
/**
* @author elephant-huang-zhao
* @date 2017/11/30
*/
public class OptimisticStampedLockTest {
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
Map<String,String> map = new HashMap<>();
StampedLock stampedLock = new StampedLock();
poolExecutor.submit(() ->{
long stamp = stampedLock.tryOptimisticRead();
try {
System.out.println("Optimistic Lock valid : "+stampedLock.validate(stamp));
sleep(1);
System.out.println("Optimistic Lock valid : "+stampedLock.validate(stamp));
sleep(2);
System.out.println("Optimistic Lock valid : "+stampedLock.validate(stamp));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlock(stamp);
}
});
poolExecutor.submit(()->{
long stamp = stampedLock.writeLock();
try {
System.out.println("Write lock acquired");
sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
stampedLock.unlockWrite(stamp);
System.out.println("Write done");
}
});
}
private static void sleep(long time) throws InterruptedException {
TimeUnit.SECONDS.sleep(time);
}
}
乐观的读锁通过调用tryOptimisticRead()获取,它总是返回一个标记而不阻塞当前线程,无论锁是否真正可用。如果已经有写锁被拿到,返回的标记等于0。你需要总是通过lock.validate(stamp)检查标记是否有效。
有时,将读锁转换为写锁而不用再次解锁和加锁十分实用。StampedLock为这种目的提供了tryConvertToWriteLock()方法,就像下面那样:
package com.huang.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
/**
* @author elephant-huang-zhao
* @date 2017/11/30
*/
public class CovertStampedLockTest {
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
StampedLock stampedLock = new StampedLock();
poolExecutor.submit(() ->
{
int count = 0;
long stamp = stampedLock.readLock();
try {
if (count == 0) {
stamp = stampedLock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
System.out.println("Could not convert to write lock");
stamp = stampedLock.writeLock();
}
count = 23;
System.out.println(count);
}
} finally {
stampedLock.unlock(stamp);
}
});
}
private static void sleep(long time) throws InterruptedException {
TimeUnit.SECONDS.sleep(time);
}
}
第一个任务获取读锁,并向控制台打印count字段的当前值。但是如果当前值是零,我们希望将其赋值为23。我们首先需要将读锁转换为写锁,来避免打破其它线程潜在的并发访问。tryConvertToWriteLock()的调用不会阻塞,但是可能会返回为零的标记,表示当前没有可用的写锁。这种情况下,我们调用writeLock()来阻塞当前线程,直到有可用的写锁。
信号量
除了锁之外,并发API也支持计数的信号量。不过锁通常用于变量或资源的互斥访问,信号量可以维护整体的准入许可。这在一些不同场景下,例如你需要限制你程序某个部分的并发访问总数时非常实用。
下面是一个例子,演示了如何限制对通过
sleep(5)
模拟的长时间运行任务的访问:
package com.huang.lock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.IntConsumer;
import java.util.stream.IntStream;
/**
* @author elephant-huang-zhao
* @date 2017/11/30
*/
public class SemaphoreTest {
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 11,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
Runnable longRunningTask = () ->
{
boolean permit = false;
try {
permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
if (permit) {
System.out.println("Semaphore acquired");
sleep(5);
} else {
System.out.println("Could not acquire semaphore");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (permit) {
semaphore.release();
}
}
};
IntConsumer intConsumer = i ->poolExecutor.submit(longRunningTask);
IntStream.range(0, 10).forEach(intConsumer);
poolExecutor.submit(() ->{
while (true) {
System.out.println("单独线程来时刻获取信号量:" + semaphore.tryAcquire(1, TimeUnit.SECONDS));
}
});
}
private static void sleep(long time) throws InterruptedException {
TimeUnit.SECONDS.sleep(time);
}
}
信号量限制对通过
sleep(5)
模拟的长时间运行任务的访问,最大5个线程。每个随后的
tryAcquire()
调用在经过最大为一秒的等待超时之后,会向控制台打印不能获取信号量的结果。