write and debug by 张艳涛
wait()和notify()的通常用法
1、 A线程取得锁,执行wait(),释放锁;;
2、 B线程取得锁,完成业务后执行notify(),再释放锁;;
3、 B线程释放锁之后,A线程取得锁,继续执行wait()之后的代码;
关于synchronize修饰的代码块
通常,对于synchronize(lock){…}这样的代码块,编译后会生成monitorenter和monitorexit指令,线程执行到monitorenter指令时会尝试取得lock对应的monitor的所有权(CAS设置对象头),取得后即获取到锁,执行monitorexit指令时会释放monitor的所有权即释放锁;
一个完整的demo
为了深入学习wait()和notify(),先用完整的demo程序来模拟场景吧,以下是源码:
package com.zyt.wait_notify;
public class NotifyDemo {
private static void sleep(long sleepVal){
try{
Thread.sleep(sleepVal);
}catch(Exception e){
e.printStackTrace();
}
}
private static void log(String desc){
System.out.println(Thread.currentThread().getName() + " : " + desc);
}
Object lock = new Object();
public void startThreadA(){
new Thread(() -> {
synchronized (lock){
log("get lock");
startThreadB();
log("start wait");
try {
}catch(InterruptedException e){
e.printStackTrace();
}
log("get lock after wait");
log("release lock");
}
}, "thread-A").start();
}
public void startThreadB(){
new Thread(()->{
synchronized (lock){
log("get lock");
startThreadC();
sleep(100);
log("start notify");
lock.notify();
log("release lock");
}
},"thread-B").start();
}
public void startThreadC(){
new Thread(() -> {
synchronized (lock){
log("get lock");
log("release lock");
}
}, "thread-C").start();
}
public static void main(String[] args){
new NotifyDemo().startThreadA();
}
}
以上就是本次实战用到的demo,代码功能简述如下:
启动线程A,取得锁之后先启动线程B再执行wait()方法,释放锁并等待;
线程B启动之后会等待锁,A线程执行wait()之后,线程B取得锁,然后启动线程C,再执行notify唤醒线程A,最后退出synchronize代码块,释放锁;
线程C启动之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
线程A在线程B执行notify()之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
线程B退出synchronize代码块,释放锁之后,线程A和线程C竞争锁;
把上面的代码在Openjdk8下面执行,反复执行多次,都得到以下结果:
thread-A : get lock
thread-A : start wait
thread-B : get lock
thread-C : c thread is start
thread-B : start notify
thread-B : release lock
thread-A : after wait, acquire lock again
thread-A : release lock
thread-C : get lock
thread-C : release lock
针对以上结果,问题来了:
第一个问题:
将以上代码反复执行多次,结果都是B释放锁之后A会先得到锁,这又是为什么呢?C为何不能先拿到锁呢?
第二个问题:
线程C自开始就执行了monitorenter指令,它能得到锁是容易理解的,但是线程A呢?在wait()之后并没有没有monitorenter指令,那么它又是如何取得锁的呢?
wait()、notify()这些方法都是native方法,所以只有从JVM源码寻找答案了,本次阅读的是openjdk8的源码;
带上问题去看JVM源码
按照demo代码执行顺序,我整理了如下问题,带着这些问题去看JVM源码可以聚焦主线,不要被一些支线的次要的代码卡住(例如一些异常处理,监控和上报等):
线程A在wait()的时候做了什么?
线程C启动后,由于此时线程B持有锁,那么线程C此时在干啥?
线程B在notify()的时候做了什么?
线程B释放锁的时候做了什么?
好了,接下来看源码分析问题吧:
线程A在wait()的时候做了什么
java代码中的
lock.wait();
这个lock是一个object对象
如果你看源码的时候能看到这个是一个native方法,jvm对于native方法的处理有俩种方法,
1、 一种是自定义,使用javah生成对应的.hpp的头,在用c++写对于的.cpp方法实现,其中会调用jin.h的env方法,在使用gcc编译成.so文件,;
ObjTest.java
package jni;
class A {}
public class ObjTest extends A{
static {
System.loadLibrary("ObjTest");
}
public ObjTest(){
System.out.println("default");
}
public ObjTest(int age){
System.out.println("param Construtor,age->"+age);
}
public native static void test(Object a);
public static void main(String[] args){
test(new ObjTest());
}
}
2.另外的一种就是系统自带的native,
如果你要看代码的话
#include <stdio.h>
#include <signal.h>
#include <limits.h>
#include "jni.h"
#include "jni_util.h"
#include "jvm.h"
#include "java_lang_Object.h"
static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
JNIEXPORT void JNICALL
Java_java_lang_Object_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls,
methods, sizeof(methods)/sizeof(methods[0]));
}
JNIEXPORT jclass JNICALL
Java_java_lang_Object_getClass(JNIEnv *env, jobject this)
{
if (this == NULL) {
JNU_ThrowNullPointerException(env, NULL);
return 0;
} else {
return (*env)->GetObjectClass(env, this);
}
}
会发现它使用了 RegisterNatives这种方法,wait对于的方法,为 (void *)&JVM_MonitorWait
在share/vm/prims/jvm.cpp文件中定义的
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
}
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
重要的是,他要进入ObjectSynchronizer.wait方法
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
//UseBiasedLocking默认为true
if (UseBiasedLocking) {
//撤销对象头中包含的偏向锁
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
if (millis < 0) {
TEVENT (wait - throw IAX) ;
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//分配一个关联的ObjectMonitor实例
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
//调用其wait方法
monitor->wait(millis, true, THREAD);
/* This dummy call is in place to get around dtrace bug 6254741. Once
that's fixed we can uncomment the following line and remove the call */
// DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
dtrace_waited_probe(monitor, obj, THREAD);
}
这里就能看到了
//调用其wait方法
monitor->wait(millis, true, THREAD);
线程B在这个时候做了什么
在线程A调用wait方法的同时,线程B也已经启动了,就是jvm源码调试中的Thread-10
能看到这个时候线程B,已经进入了synchronized代码块,其中对应的指令就是monitorenter
对于monitorenter指令进行简单分析
先打印一下bt信息
#0 ObjectMonitor::enter (this=0x7fb1280036e0, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp:323
#1 0x00007fb14a8de3e6 in ObjectSynchronizer::slow_enter (obj=..., lock=0x7fb1345fc6a8, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:258
#2 0x00007fb14a8ddf70 in ObjectSynchronizer::fast_enter (obj=..., lock=0x7fb1345fc6a8, attempt_rebias=true, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:180
#3 0x00007fb14a538e2f in InterpreterRuntime::monitorenter (thread=0x7fb11c001800, elem=0x7fb1345fc6a8) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp:573
如果看网上的一般文章那么分析的起点就是InterpreterRuntime::monitorenter ,其实是不对的,也就是99.8%的文章都是差了那么一乃乃,
在上下求索的过程中,发现连mashibing的公开课讲的课,黄俊将的synchronized底层实现都是错的,他错在了哪里了? 他用了openjdk1.8早期的
源码版本来讲偏向锁,其实这个源码还没有实现偏向锁,我也遇到了这个问题,发现怎么看也看不懂,那么真正的入口是
bytecodeInterpreter.cpp
/* monitorenter and monitorexit for locking/unlocking an object */
CASE(_monitorenter): {
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// find a free monitor or one already allocated for this object
// if we find a matching object then we need a new monitor
// since this is recursive enter
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
if (entry != NULL) {
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
// implies UseBiasedLocking
// code 3:如果锁对象的mark word的状态是偏向模式
if (mark->has_bias_pattern()) {//has_bias_pattern las 3bit is 101 ?
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);
// code 5:如果偏向的线程是自己且epoch等于class的epoch
if (anticipated_bias_locking_value == 0) {
// already biased towards this thread, nothing to do
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
/**
* 初始化:偏向锁默认是延时初始化的,延迟的时间通过参数BiasedLockingStartupDelay控制,默认是4000ms默认是4000ms。
* 初始化是在安全点下通过VMThread完成的,初始化时会把由SystemDictionary维护的所有已加载类的
* Klass的prototype_header修改成匿名偏向锁对象头,并把_biased_locking_enabled静态属性置为true,
* 后续加载新的Klass时发现该属性为true,会将Klass的prototype_header修改成匿名偏向锁对象头。
* 当创建某个Klass的oop时,会利用Klass的prototype_header来初始化该oop的对象头,即偏向锁初始化完成后,
* 后续所有创建的oop的初始对象头都是匿名偏向锁的,在此之前创建的oop初始对象头都是无锁状态的。
*/
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// try revoke bias
markOop header = lockee->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
//_biased_locking_enabled静态属性为false,默认是4000ms,尚未开启偏向锁,所以撤销偏向锁
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
// code 7:如果epoch不等于class中的epoch,则尝试重偏向
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// try rebias
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
// 重偏向失败,代表存在多线程竞争,则调用monitorenter方法进行锁升级
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
else {
// 走到这里说明当前要么偏向别的线程,要么是匿名偏向(即没有偏向任何线程)
// code 8:下面构建一个匿名偏向的mark word,尝试用CAS指令替换掉锁对象的mark word
// try to bias towards thread in case object is anonymously biased
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
(uintptr_t)markOopDesc::age_mask_in_place |
epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// debugging hint
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
// CAS修改成功
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
else {
// 如果修改失败说明存在多线程竞争,所以进入monitorenter方法
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
//if (mark->has_bias_pattern()) {//has_bias_pattern las 3bit is 101 ? } end of code
//如果没符合的mark->has_bias_pattern就到这里了,
// traditional lightweight locking
// 如果偏向线程不是当前线程或没有开启偏向模式等原因都会导致success==false??存疑
if (!success) {
// 轻量级锁的逻辑
//code 9: 构造一个无锁状态的Displaced Mark Word,并将Lock Record的lock指向它
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
//如果指定了-XX:+UseHeavyMonitors,则call_vm=true,代表禁用偏向锁和轻量级锁
bool call_vm = UseHeavyMonitors;
// 利用CAS将对象头的mark word替换为指向Lock Record的指针,(如果是无锁状态,直接升级轻量级锁)
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// Is it simple recursive case?
// 判断是不是锁重入,进入这里说明obj是不是无锁状态,
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
//code 10: 如果是锁重入,则直接将Displaced Mark Word设置为null
entry->lock()->set_displaced_header(NULL);
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
// lock record不够,重新执行
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
/**
* JVM中的每个类也有一个类似mark word的prototype_header,用来标记该class的epoch和偏向开关等信息。
* 上面的代码中lockee->klass()->prototype_header()即获取class的prototype_header。
code 1,从当前线程的栈中找到一个空闲的Lock Record(即代码中的BasicObjectLock,下文都用Lock Record代指),
判断Lock Record是否空闲的依据是其obj字段 是否为null。
注意这里是按内存地址从低往高找到最后一个可用的Lock Record,换而言之,就是找到内存地址最高的可用Lock Record。
code 2,获取到Lock Record后,首先要做的就是为其obj字段赋值。
code 3,判断锁对象的mark word是否是偏向模式,即低3位是否为101。
code 4,这里有几步位运算的操作 anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place); 这个位运算可以分为3个部分。
第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident)
将当前线程id和类的prototype_header相或,这样得到的值为
(当前线程id + prototype_header中的(epoch + 分代年龄 + 偏向锁标志 + 锁标志位))
,注意prototype_header的分代年龄那4个字节为0
第二部分 ^ (uintptr_t)mark 将上面计算得到的结果与锁对象的markOop进行异或,
相等的位全部被置为0,只剩下不相等的位。
第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place为...0001111000,
取反后,变成了...1110000111,除了分代年龄那4位,其他位全为1;
将取反后的结果再与上面的结果相与,将上面异或得到的结果中分代年龄给忽略掉。
code 5,anticipated_bias_locking_value==0代表偏向的线程是当前线程且mark word的epoch等于class的epoch,这种情况下什么都不用做。
code 6,(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0
代表class的prototype_header或对象的mark word中偏向模式是关闭的,
又因为能走到这已经通过了mark->has_bias_pattern()判断,
即对象的mark word中偏向模式是开启的,那也就是说class的prototype_header不是偏向模式。
然后利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤销偏向锁,
我们知道CAS会有几个参数,1是预期的原值,2是预期修改后的值 ,3是要修改的对象,
与之对应,cmpxchg_ptr方法第一个参数是预期修改后的值,第2个参数是修改的对象,第3个参数是预期原值,
方法返回实际原值,如果等于预期原值则说明修改成功。
code 7,如果epoch已过期,则需要重偏向,利用CAS指令将锁对象的mark word替换为一个偏向当前线程且epoch为类的epoch的新的mark word。
code 8,CAS将偏向线程改为当前线程,如果当前是匿名偏向则能修改成功,否则进入锁升级的逻辑。
code 9,这一步已经是轻量级锁的逻辑了。从上图的mark word的格式可以看到,
轻量级锁中mark word存的是指向Lock Record的指针。这里构造一个无锁状态的mark word,
然后存储到Lock Record(Lock Record的格式可以看第一篇文章)。设置mark word是无锁状态的原因是:
轻量级锁解锁时是将对象头的mark word设置为Lock Record中的Displaced Mark Word,
所以创建时设置为无锁状态,解锁时直接用CAS替换就好了。
code 10, 如果是锁重入,则将Lock Record的Displaced Mark Word设置为null,起到一个锁重入计数的作用。
以上是偏向锁加锁的流程(包括部分轻量级锁的加锁流程),如果当前锁已偏向其他线程||epoch值过期||偏向模式关闭||获取偏向锁的过程中存在并发冲突,
都会进入到InterpreterRuntime::monitorenter方法, 在该方法中会对偏向锁撤销和升级。
*/
这个比较复杂,如果你下载的版本是hotspot-87ee5ee27509那么你会看到源码为
/* monitorenter and monitorexit for locking/unlocking an object */
CASE(_monitorenter): {
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// find a free monitor or one already allocated for this object
// if we find a matching object then we need a new monitor
// since this is recursive enter
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
if (entry != NULL) {
entry->set_obj(lockee);
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// Is it simple recursive case?
if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
这个真是个坑,那么分析一下这个过程
接着进入fast_enter
直接进入,
如果进入inflate方法,
会有其中情况:
- // The mark can be in one of the following states:
- // * Inflated - just return
- // * Stack-locked - coerce it to inflated
- // * INFLATING - busy wait for conversion to complete
- // * Neutral - aggressively inflate the object.
- // * BIASED - Illegal. We should never see this
- ===以上是源码注解
- 如果膨胀过了,锁已经是 ObjectMonitor,那么直接返回
- 如果是Stack-locked,就是轻量级锁,那么锁膨胀
- 如果膨胀中
- 如果无锁Neutral,直接膨胀为重量级锁
- 如果biased偏向锁,非法,因为上一个方法有判断,如果是偏向锁,直接返回不会到到达这里的
这里只贴了,锁此刻为重量级锁的时候,
那么返回的 ObjectMonitor * m 进入了enter方法
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
//原子的设置owner属性,如果_owner属性是NULL就将其设置为Self,否则返回当前的_owner属性
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
//设置成功,说明该Monitor没有被人占用
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
return ;
}
if (cur == Self) {
//设置失败,说明该Monitor就是当前线程占用的,此处进入enter是嵌套加锁情形
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions ++ ;
return ;
}
//轻量级锁膨胀成重量级锁时,将owner设置为lock属性
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
//该Monitor被其他某个线程占用了,需要抢占
// We've encountered genuine contention.
assert (Self->_Stalled == 0, "invariant") ;
//记录需要抢占的Monitor指针
Self->_Stalled = intptr_t(this) ;
//Knob_SpinEarly默认为1,即为true
//TrySpin让当前线程自旋,自旋的次数默认可以自适应调整,如果进入安全点同步则退出自旋,返回1表示抢占成功
// Try one round of spinning *before* enqueueing Self
// and before going through the awkward and expensive state
// transitions. The following spin is strictly optional ...
// Note that if we acquire the monitor from an initial spin
// we forgo posting JVMTI events and firing DTRACE probes.
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}
//自旋若干次数后依然抢占失败
assert (_owner != Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (Self->is_Java_thread() , "invariant") ;
JavaThread * jt = (JavaThread *) Self ;
assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
assert (jt->thread_state() != _thread_blocked , "invariant") ;
assert (this->object() != NULL , "invariant") ;
assert (_count >= 0, "invariant") ;
//原子的将_count属性加1,表示增加了一个抢占该锁的线程
// Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy().
// Ensure the object-monitor relationship remains stable while there's contention.
Atomic::inc_ptr(&_count);
EventJavaMonitorEnter event;
{ // Change java thread status to indicate blocked on monitor enter.
//修改Java线程状态为BLOCKED_ON_MONITOR_ENTER,此代码块退出后还原成原来的
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);
DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_enter()) {
JvmtiExport::post_monitor_contended_enter(jt, this);
}
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
Self->set_current_pending_monitor(this);
// TODO-FIXME: change the following for(;;) loop to straight-line code.
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
//会通过自旋,park等方式不断循环尝试获取锁,直到成功获取锁为止
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
Atomic::dec_ptr(&_count);
assert (_count >= 0, "invariant") ;
Self->_Stalled = 0 ;
// Must either set _recursions = 0 or ASSERT _recursions == 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
// The thread -- now the owner -- is back in vm mode.
// Report the glorious news via TI,DTrace and jvmstat.
// The probe effect is non-trivial. All the reportage occurs
// while we hold the monitor, increasing the length of the critical
// section. Amdahl's parallel speedup law comes vividly into play.
//
// Another option might be to aggregate the events (thread local or
// per-monitor aggregation) and defer reporting until a more opportune
// time -- such as next time some thread encounters contention but has
// yet to acquire the lock. While spinning that thread could
// spinning we could increment JVMStat counters, etc.
DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_entered()) {
JvmtiExport::post_monitor_contended_entered(jt, this);
}
if (event.should_commit()) {
event.set_klass(((oop)this->object())->klass());
event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid);
event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr()));
event.commit();
}
if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
}
}
// Caveat: TryLock() is not necessarily serializing if it returns failure.
// Callers must compensate as needed.
int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
void * own = _owner ;
if (own != NULL) return 0 ;
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
// Either guarantee _recursions == 0 or set _recursions = 0.
assert (_recursions == 0, "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert that OwnerIsThread == 1
return 1 ;
}
// The lock had been free momentarily, but we lost the race to the lock.
// Interference -- the CAS failed.
// We can either return -1 or retry.
// Retry doesn't make as much sense because the lock was just acquired.
if (true) return -1 ;
}
}
标记了比较重要的方法,主要流程就是
- cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; cas,期望无人使用锁,(即 MonitorObject._owner为0x0),
- 比如这个时候线程A 会在wati方法在释放锁之前这个_owner的值为ThreadA线程的一个包装对象,那么此时需要等待锁
- 在ThreadA释放锁之后,这个时候如果进入执行这个方法那么,直接回返回,代表已经获取了锁
-
锁进行等待的代码主要是EnterI ,主要实现如下
for (;;) { if (TryLock (Self) > 0) break ; assert (_owner != Self, "invariant") ; if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } // park self if (_Responsible == Self || (SyncFlags & 1)) { TEVENT (Inflated enter - park TIMED) ; Self->_ParkEvent->park ((jlong) RecheckInterval) ; // Increase the RecheckInterval, but clamp the value. RecheckInterval *= 8 ; if (RecheckInterval > 1000) RecheckInterval = 1000 ; } else { TEVENT (Inflated enter - park UNTIMED) ; Self->_ParkEvent->park() ; } if (TryLock(Self) > 0) break ; // The lock is still contested. // Keep a tally of the # of futile wakeups. // Note that the counter is not protected by a lock or updated by atomics. // That is by design - we trade "lossy" counters which are exposed to // races during updates for a lower probe effect. TEVENT (Inflated enter - Futile wakeup) ; if (ObjectMonitor::_sync_FutileWakeups != NULL) { ObjectMonitor::_sync_FutileWakeups->inc() ; } ++ nWakeups ; // Assuming this is not a spurious wakeup we'll normally find _succ == Self. // We can defer clearing _succ until after the spin completes // TrySpin() must tolerate being called with _succ == Self. // Try yet another round of adaptive spinning. if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ; // We can find that we were unpark()ed and redesignated _succ while // we were spinning. That's harmless. If we iterate and call park(), // park() will consume the event and return immediately and we'll // just spin again. This pattern can repeat, leaving _succ to simply // spin on a CPU. Enable Knob_ResetEvent to clear pending unparks(). // Alternately, we can sample fired() here, and if set, forgo spinning // in the next iteration. if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) { Self->_ParkEvent->reset() ; OrderAccess::fence() ; } if (_succ == Self) _succ = NULL ; // Invariant: after clearing _succ a thread *must* retry _owner before parking. OrderAccess::fence() ; }
上边的方法里面可以主要看出
- TryLock(Self),试着cas一把,如果没人占用锁就,占用锁
-
Self->_ParkEvent->park ((jlong) RecheckInterval) ; 线程挂起一段时间
- 醒了之后,接着TryLock(Self),若不成功则一直循环,
接着看线程A在wait()的时候做了什么
此时线程B在一直循环等待获取锁,线程A开始执行wait方法,2者是并行执行的
可以看到其实Thread9 就是线程A在jvm中对应的线程
那么看到在threadA执行完wait方法之后,先释放了锁.在把自己所对应的linux线程挂起
那么还查threadB.notify分析
流程:
lock.notify() //调用lock对象的方法
--> //这个obj就是这个lock,将lock作为参数
ObjectSynchronizer::notify(obj, CHECK);
--> //找到obj对象上的重量锁ObjectMonitor
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
--> // 调用锁上的notify方法
void ObjectMonitor::notify(TRAPS)
那么现在进入关键步骤
进入的去DequeueWaiter()
接着
打印
(gdb) p _WaitSet
$1= (ObjectWaiter * volatile) 0x7f88bbbfa110
(gdb) p *node
$6 = (ObjectWaiter) {
= {
= {
_vptr.AllocatedObj = 0x7f88d66797f0
}, },
members of ObjectWaiter:
_next = 0x7f88bbbfa110,
_prev = 0x7f88bbbfa110,
_thread = 0x7f88d0160800,
_notifier_tid = 140225242177872,
_event = 0x7f88d0161b00,
_notified = 0,
TState = ObjectWaiter::TS_WAIT,
_Sorted = (unknown: 3019902016),
_active = false
}
看这个链表节点的next和priev都是自己
ObjectWaiter* next = node->_next; 找到下一个节点node3
if(next == node) { //如果next指向自己说明,只有一个node节点,那么久就将_WaitSet置空
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
}else { //不进入这里,这里是中间节点的情况
ObjectWaiter* prev = node->_prev; 找到上一个节点 ,命名node1
next->_prev = prev; 令下一个节点的上一个指向 node1
prev->_next = next; 令node1的_next指针指向node3
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL; 接着道这里,将node节点的next ,priev置空
node->_prev = NULL;
这就
那么这样就取出来了objectWaiter 对象
接下来对ObjectWaiter对象的处理方式,根据Policy的不同而不同:
Policy == 0:放入_EntryList队列的排头位置;
Policy == 1:放入_EntryList队列的末尾位置;
Policy == 2:_EntryList队列为空就放入_EntryList,否则放入_cxq队列的排头位置;
那么接着进入ThreadB的monitorexit指令
小结一下,线程B释放了锁之后,执行的操作如下:
偏向锁逻辑,此处未命中;
根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;
唤醒的元素会继续执行挂起前的代码,按照我们之前的分析,线程唤醒后,就会通过CAS去竞争锁,此时由于线程B已经释放了锁,那么此时应该能竞争成功;
唤醒目标线程,就是A之前wait方法里面挂起了,那么接着看wait方法的实现
还有
//ReenterI和EnterI的逻辑基本相同,用于获取对象锁
void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) {
assert (Self != NULL , "invariant") ;
assert (SelfNode != NULL , "invariant") ;
assert (SelfNode->_thread == Self , "invariant") ;
assert (_waiters > 0 , "invariant") ;
//校验目标对象的对象头就是当前ObjectMonitor的指针
assert (((oop)(object()))->mark() == markOopDesc::encode(this) , "invariant") ;
assert (((JavaThread *)Self)->thread_state() != _thread_blocked, "invariant") ;
JavaThread * jt = (JavaThread *) Self ;
int nWakeups = 0 ;
for (;;) {
ObjectWaiter::TStates v = SelfNode->TState ;
//校验状态
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
assert (_owner != Self, "invariant") ;
//尝试获取锁
if (TryLock (Self) > 0) break ;
//尝试自旋获取锁
if (TrySpin (Self) > 0) break ;
TEVENT (Wait Reentry - parking) ;
{
//修改线程状态
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
jt->set_suspend_equivalent();
//SyncFlags默认是0
if (SyncFlags & 1) {
Self->_ParkEvent->park ((jlong)1000) ;
} else {
Self->_ParkEvent->park () ;
}
// were we externally suspended while we were waiting?
for (;;) {
//ExitSuspendEquivalent默认返回false
if (!ExitSuspendEquivalent (jt)) break ;
if (_succ == Self) { _succ = NULL; OrderAccess::fence(); }
jt->java_suspend_self();
jt->set_suspend_equivalent();
}
}
//尝试获取锁
if (TryLock(Self) > 0) break ;
TEVENT (Wait Reentry - futile wakeup) ;
++ nWakeups ;
// Assuming this is not a spurious wakeup we'll normally
// find that _succ == Self.
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a contending thread
// *must* retry _owner before parking.
OrderAccess::fence() ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
}//for循环结束
//for循环结束,已经获取了锁
assert (_owner == Self, "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
//从链表中移除
UnlinkAfterAcquire (Self, SelfNode) ;
if (_succ == Self) _succ = NULL ;
assert (_succ != Self, "invariant") ;
//修改状态为TS_RUN
SelfNode->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ; // see comments at the end of EnterI()
}
本文结束