达芬奇密码

除了技术,无它

439篇博客

Java Synchronized JVM实现分析(上篇)

达芬奇密码2018-08-17 15:13
在JVM中对于Synchronized的实现逻辑跟AQS处理逻辑很相似,如果对于AQS很熟悉的同学很快就可以理解JVM中的处理逻辑了。 

在Java 语言中存在两种内建的synchronized语法:
  1. synchronize代码块
  2. synchronized方法
对于Synchronized代码块而言,在java编译的时候会在同步代码块的入口位置分别插入monitorenter和monitorexit字节码指令。
而synchronized方法在VM字节码指令层面没有任何特殊的指令来实现,而是在Class文件的方法中将该方法的access_flag中的synchronized标志设置为1,表示该方法是同步方法。 
public static synchronized void synchronizedMethod();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:
      stack=2, locals=0, args_size=0
         0: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #3                  // String 同步方法
         5: invokevirtual #4                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: return
      LineNumberTable:
        line 8: 0
        line 9: 8

  public static void synchronizedBlock();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=2, args_size=0
         0: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #5                  // String 进入同步代码块
         5: invokevirtual #4                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: getstatic     #6                  // Field lock:Ljava/lang/Integer;
        11: dup
        12: astore_0
        13: monitorenter
        14: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
        17: ldc           #7                  // String 同步代码块
        19: invokevirtual #4                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        22: aload_0
        23: monitorexit
        24: goto          32
        27: astore_1
        28: aload_0
        29: monitorexit
        30: aload_1
        31: athrow
        32: return 

    很多时候会说synchronized比较重量,为什么这么说呢?这也是因为在JDK1.6 之前 monitorenter moniterexit是依赖与操作系统的Mutex Lock 来实现的 但是由于使用Mutex Lock需要将当前线程挂起并从用户态切换到内核态来执行,这种切换的代价是非常昂贵的;然而在现实中的大部分情况下,同步方法是运行在单线程环境(无锁竞争环境)如果每次都调用Mutex Lock那么将严重的影响程序的性能。不过在1.6中对锁的实现做了大量的优化,如锁消除(Lock Elimination) ,偏向锁(Biased Locking) ,轻量级锁(Lightweight Locking), 粗化锁(Lock Coarsening),  自适用自旋(Adaptive Spinning)等技术来减少锁的开销。
  • Lock Coarsening: 什么是粗化锁呢?就是锁合并,如果有多个连在一起的lock, unlock 操作,和将其优化为一个范围更大的锁
  • Lock Elimination: 通过运行时JIT编译器的逃逸分析来消除一些没有在当前同步块意外被其它线程共享的数据的锁保护。
  • Lightweight Locking: 轻量级锁,是一种乐观锁,它基于一种乐观情况即真实的情况中,大部分同步代码一般都是处于无锁竞争状态,即对于共享资源多线程是顺序占用的,在没有竞争的情况下完全可以避免调用操作系统的重量级锁,而是使用CAS执行更新对象markword指向锁记录的指针。但是如果CAS失败了怎么办呢?这个时候就将锁膨胀为重量级锁,CAS失败了表示锁已经被占用了,膨胀为重量级锁还是无法占用资源?那么为什么要这么处理呢?是因该如果当前锁是重量级锁,在该锁后面的线程都是重量级锁,挂起等待,自身则则是自旋,尝试获取锁。
  • Biased Locking: 如果在无锁竞争的情况下使用轻量级锁,还是有点浪费,因为轻量级锁每次都需要进行CAS更新Lock Record;此时使用了Biased Locking 将markword中将当前的线程ID设置到对象的markword中,当前线程再次获取锁的时候就不需要再次CAS更新线程ID了,可以直接执行同步代码. 偏向锁可以提高带有同步但无竞争的程序性能,但如果程序中大多数的锁总是被多个不同的线程访问,那偏向模式就是多余的。在具体情形分析下,禁止偏向锁优反而可能提升性能。-XX:-UseBiasedLocking=false
  • 自旋(spinning):如果看过AQS源码的同学应该很清楚自旋的实现方式,它是基于一种假设,如果持有锁的线程在在很短的时间内就会释放共享资源,那么那些竞争共享资源的线程只要稍等一下就可以了;怎么等呢?就是空转(自旋),等持有锁的线程释放锁以后就可以立即获取到锁,这样可以避免用户线程和内核线程切换的开销,但是线程自旋是需要消耗CPU的,如果一直获取不到锁,那么线程就一直占用CPU在做无用功。
  • 自适用自旋:就是JVM中进行了优化自旋,如果自旋达到了一定次数就不要在自旋了直接挂起 
首先介绍一下markword
  enum { locked_value             = 0, //00 轻量级锁
         unlocked_value           = 1, //01 无锁
         monitor_value            = 2, //10 重量级锁
         marked_value             = 3, //11 GC标记 
         biased_lock_pattern      = 5  //101  偏向锁
  };
Java是解释执行的,在对字节码进行解释的源码如下:
/* monitorenter and monitorexit for locking/unlocking an object */

      CASE(_monitorenter): {
    	//获取锁对象
    	oop lockee = STACK_OBJECT(-1);
        // derefing's lockee ought to provoke implicit null check
        CHECK_NULL(lockee);
        // find a free monitor or one already allocated for this object
        // if we find a matching object then we need a new monitor
        // since this is recursive enter
        BasicObjectLock* limit = istate->monitor_base();
        BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
        BasicObjectLock* entry = NULL;
        while (most_recent != limit ) {
          if (most_recent->obj() == NULL) entry = most_recent;
          else if (most_recent->obj() == lockee) break;
          most_recent++;
        }
        //锁记录不为空
        if (entry != NULL) {
          //设置锁记录的锁对象为当前锁对象
          entry->set_obj(lockee);
          //默认偏向锁失败
          int success = false;
          //获取epoch
          uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
          //获取锁的markword
          markOop mark = lockee->mark();
          //hash=0
          intptr_t hash = (intptr_t) markOopDesc::no_hash;
          //是否可以偏向
          if (mark->has_bias_pattern()) {
            uintptr_t thread_ident;
            uintptr_t anticipated_bias_locking_value;
            //当前线程ID
            thread_ident = (uintptr_t)istate->thread();
            //((uintptr_t)lockee->klass()->prototype_header() | thread_ident 计算当前线程如果有偏向锁预期的值(不考虑age)
            //和锁对象的markword进行异或操作(相同为0)计算出两个markword的差异
            anticipated_bias_locking_value =
              (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
              ~((uintptr_t) markOopDesc::age_mask_in_place);
            //anticipated_bias_locking_value=0 表示当前偏向锁就是当前线程占有的且epoch值也是相同的,就不在做任何事情了
            if  (anticipated_bias_locking_value == 0) {

              if (PrintBiasedLockingStatistics) {
                (* BiasedLocking::biased_lock_entry_count_addr())++;
              }
              success = true;
            }
            else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
            	//如果偏向标记设置为不可偏向则需要撤销偏向
              markOop header = lockee->klass()->prototype_header();
              if (hash != markOopDesc::no_hash) {
                header = header->copy_set_hash(hash);
              }
              if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
                if (PrintBiasedLockingStatistics)
                  (*BiasedLocking::revoked_lock_entry_count_addr())++;
              }
            }
            else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
            	//如果epoch值不同则表示需要重偏向(在异或操作中如果相同应该为0,否则为1)此时不为0表示对象头中的epoch值已经修改了,表示需要重偏向
            	markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
              if (hash != markOopDesc::no_hash) {
                new_header = new_header->copy_set_hash(hash);
              }
              //CAS更新锁对象的对象头,更新成功则表示获取偏向锁
              if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
                if (PrintBiasedLockingStatistics)
                  (* BiasedLocking::rebiased_lock_entry_count_addr())++;
              }
              else {//更新对象头失败,则执行InterpreterRuntime::monitorenter 进而对偏向锁进行撤销再偏向
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
              success = true;
            }
            else {//对象还是可偏向且epoch值相同不需要重偏向,只是占有偏向锁可能被其它线程占有或匿名线程占有(markword中threadId为null)
              //重新计算对象头
              markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
                                                              (uintptr_t)markOopDesc::age_mask_in_place |
                                                              epoch_mask_in_place));
              if (hash != markOopDesc::no_hash) {
                header = header->copy_set_hash(hash);
              }
              markOop new_header = (markOop) ((uintptr_t) header | thread_ident);

              DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
              if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
                if (PrintBiasedLockingStatistics)
                  (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
              }
              else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
              success = true;
            }
          }

          //偏向锁失败,则尝试轻量级锁
          if (!success) {
        	  //将对象头设置为无锁
        	markOop displaced = lockee->mark()->set_unlocked();
            entry->lock()->set_displaced_header(displaced);
            bool call_vm = UseHeavyMonitors;
            if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
              // Is it simple recursive case?
              if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                entry->lock()->set_displaced_header(NULL);
              } else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
            }
          }
          UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
        } else {
          istate->set_msg(more_monitors);
          UPDATE_PC_AND_RETURN(0); // Re-execute
        }
      }

InterpreterRuntime:: monitorenter 该方法进入的条件是获取偏向锁失败,如果系统配置使用偏向锁则会执行 ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
  if (PrintBiasedLockingStatistics) {
    Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
  }
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
         "must be NULL or an object");
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
使用偏向锁对场景,通过上文可以知道,线程尝试获取偏向锁失败,在该方法中需要对匿名偏向锁进行撤销后在偏向
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
     //是否使用偏向锁
    if (UseBiasedLocking) {
        //不是出于安全点
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      //撤销匿名偏向锁后,再偏向成功就返回
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {//如果是在安全点则取消偏向锁,直接升级为轻量级锁
      assert(!attempt_rebias, "can not rebias toward VM thread");
      //取消偏向锁
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }
 //不使用偏向锁,或获取偏向锁失败,则获取轻量级锁
 slow_enter (obj, lock, THREAD) ;
}
 如果开启了偏向锁,是不是每次都需要经过该段逻辑处理呢? 实际上如果该对象竞争比较激烈则不会尝试获取偏向锁的。
JVM 内部为每个类维护一个偏向锁revoke计数器,对其对象进行偏向锁的撤销操作进行计数。当这个值达到指定阈值( BiasedLockingBulkRebiasThreshold,20)的时候,jvm就认为这个类的偏向锁有问题,需要进行重偏向(rebias)。对所有属于这个类的对象进行重偏向的操作叫批量重偏向(bulk rebias)。当需要bulk rebias时,对这个类的epcho值加1,以后分配这个类的对象的时候mark字段里就是这个epoch值了,同时还要对当前已经获得偏向锁的对象的epoch值加1,这些锁数据记录在方法栈里。这样判断这个对象是否获得偏向锁的条件就是:mark字段后3位是101,thread字段跟当前线程相同,epoch字段跟所属类的epoch值相同。如果epoch值不一样,即使thread字段指向当前线程,也是无效的,相当于进行过了rebias,只是没有对对象的mark字段进行更新。如果这个类的revoke计数器继续增加到一个阈值( BiasedLockingBulkRevokeThreshold,40),那个jvm就认为这个类不适合偏向锁了,就要进行bulk revoke。于是多了一个判断条件,要查看所属类的字段,看看是否允许对这个类使用偏向锁。

 //匿名偏向锁,就是偏向锁就是锁标记为101,但是线程ID为null
  //获取对象的markword
  markOop mark = obj->mark();
  //如果是匿名偏向锁同时不尝试获取偏向锁
  if (mark->is_biased_anonymously() && !attempt_rebias) {
    //已经匿名偏向的markword
    markOop biased_value       = mark;
    //创建一个非偏向的markword
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    //CAS更新markword
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    //更新成功则表示成功取消了匿名偏向锁
    if (res_mark == biased_value) {
      return BIAS_REVOKED;
    }
  } else if (mark->has_bias_pattern()) { //如果是可偏向,即锁标记为101
      //获取对象的Klass
      Klass* k = obj->klass();
      //获取Klass原形头,markword
    markOop prototype_header = k->prototype_header();
    //如果prototype_header中没有偏向锁标记,即该类不可偏向,此时该对象的偏向锁标记为无效标记
    if (!prototype_header->has_bias_pattern()) {
     //已经存在偏向标记的markword,当前对象的markWord
      markOop biased_value       = mark;
      //CAS更新对象头markWord为非偏向锁
      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
      return BIAS_REVOKED;
      //如果prototype_header的epoch的值和markword中的epoch值不相等,表明偏向锁已经过期
    } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
      //如果尝试获取锁
      if (attempt_rebias) {
        assert(THREAD->is_Java_thread(), "");
        markOop biased_value       = mark;
        //重新偏向,在下面会将该方法的源码贴出
        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
        //CAS更新markword
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
         //CAS更新成功表示获取获取到偏向锁,重偏向成功
        if (res_mark == biased_value) {
          return BIAS_REVOKED_AND_REBIASED;
        }
      } else { //不尝试获取锁,则取消偏向锁
        markOop biased_value       = mark;
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED;
        }
      }
    }
  }
  //对象头设置为不可偏向,CAS更新对象头失败都会执行到该处
  //在该方法中会对撤销偏向锁计数进行自增
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {//不可偏向
    return NOT_BIASED;
  } else if (heuristics == HR_SINGLE_REVOKE) {//可偏向且未到批量处理阈值
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();
    if (mark->biased_locker() == THREAD &&
        prototype_header->bias_epoch() == mark->bias_epoch()) {
      ResourceMark rm;
      log_info(biasedlocking)("Revoking bias by walking my own stack:");
      BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
      ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
      assert(cond == BIAS_REVOKED, "why not?");
      return cond;
    } else {//执行撤销偏向锁
      VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
      VMThread::execute(&revoke);
      return revoke.status_code();
    }
  }

  //批量重偏向或批量撤销
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}
//撤销偏向锁计数自增 HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {
    return NOT_BIASED;
  } else if (heuristics == HR_SINGLE_REVOKE) {//
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();

//撤销自己的偏向锁
if (mark->biased_locker() == THREAD &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
ResourceMark rm;
log_info(biasedlocking)("Revoking bias by walking my own stack:");
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
return cond;
} else {
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
return revoke.status_code();
}
}



 static markOop encode(JavaThread* thread, uint age, int bias_epoch) {
    intptr_t tmp = (intptr_t) thread;
    assert(UseBiasedLocking && ((tmp & (epoch_mask_in_place | age_mask_in_place | biased_lock_mask_in_place)) == 0), "misaligned JavaThread pointer");
    assert(age <= max_age, "age too large");
    assert(bias_epoch <= max_bias_epoch, "bias epoch too large");
    //设置了线程ID,epoch,age以及偏向锁标记
    return (markOop) (tmp | (bias_epoch << epoch_shift) | (age << age_shift) | biased_lock_pattern);
  }

static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
  markOop mark = o->mark();
  if (!mark->has_bias_pattern()) {
    return HR_NOT_BIASED;
  }

 //获取对象Klass
  Klass* k = o->klass();
  //获取当前时间
  jlong cur_time = os::javaTimeMillis();
  //最近一次批量撤销偏向锁的时间
  jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time();
  //获取撤销偏向次数
  int revocation_count = k->biased_lock_revocation_count();
  //如果撤销偏向次数大于等于批量重偏向次数但是小于批量废止的阈值同时最近一次废止偏向的时间不为0且间隔大于偏向延迟间隔
  if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&
      (revocation_count <  BiasedLockingBulkRevokeThreshold) &&
      (last_bulk_revocation_time != 0) &&
      (cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) {
    //重置偏向锁撤销计数
    k->set_biased_lock_revocation_count(0);
    revocation_count = 0;
  }

  // 撤销偏向计数小于批量废止阈值则废止计数自增
  if (revocation_count <= BiasedLockingBulkRevokeThreshold) {
    revocation_count = k->atomic_incr_biased_lock_revocation_count();
  }
  //如果达到批量撤销阈值
  if (revocation_count == BiasedLockingBulkRevokeThreshold) {
    return HR_BULK_REVOKE;
  }
  //如果达到批量重偏向计数阈值
  if (revocation_count == BiasedLockingBulkRebiasThreshold) {
    return HR_BULK_REBIAS;
  }

  return HR_SINGLE_REVOKE;
}

网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者张伟授权发布。