1. VMThread::inner_execute() - 触发安全点
cpp
复制
void VMThread::inner_execute(VM_Operation* op) { if (op->evaluate_at_safepoint()) { SafepointSynchronize::begin(); // 进入安全点,阻塞所有线程 // ...执行GC等操作... SafepointSynchronize::end(); // 结束安全点,唤醒线程 } }
- 功能:执行需要安全点的 VM 操作(如 GC)。
- 关键点:
SafepointSynchronize::begin()
:暂停所有线程,进入安全点。SafepointSynchronize::end()
:完成 GC 后,调用此函数解除线程阻塞。
2. SafepointSynchronize::end() - 结束安全点
cpp
复制
void SafepointSynchronize::end() { disarm_safepoint(); // 核心:解除安全点 Universe::heap()->safepoint_synchronize_end(); // GC后清理 }
- 功能:安全点结束时的清理工作。
- 核心调用:
disarm_safepoint()
负责恢复线程运行。
3. SafepointSynchronize::disarm_safepoint() - 解除安全点
cpp
复制
void SafepointSynchronize::disarm_safepoint() { _state = _not_synchronized; // 全局状态标记为非同步 Atomic::store(&_safepoint_counter, _safepoint_counter + 1); // 递增安全点ID // 恢复所有线程状态 for (JavaThread *current : JavaThreadIterator()) { current->safepoint_state()->restart(); // 标记线程为运行状态 } _wait_barrier->disarm(); // 唤醒阻塞的线程 Threads_lock->unlock(); // 解锁线程列表 }
- 功能:
- 将全局安全点状态设置为 非同步。
- 更新安全点计数器,触发内存屏障保证可见性。
- 遍历所有线程,调用
restart()
重置线程状态。 - 调用屏障的
disarm()
方法唤醒所有线程。
4. LinuxWaitBarrier::disarm() - 唤醒线程
cpp
复制
void LinuxWaitBarrier::disarm() { _futex_barrier = 0; // 重置屏障值 syscall(SYS_futex, &_futex_barrier, FUTEX_WAKE_PRIVATE, INT_MAX); // 唤醒所有等待线程 }
- 功能:通过 Linux 的
futex
系统调用唤醒所有阻塞在安全点的线程。 - 关键点:
FUTEX_WAKE_PRIVATE
:唤醒所有在_futex_barrier
上等待的线程。INT_MAX
:唤醒最大数量的线程(实际唤醒所有等待的线程)。
5. 线程阻塞与唤醒机制
- 线程阻塞:
- 在安全点开始时,线程通过
SafepointSynchronize::block()
调用futex
的FUTEX_WAIT
进入阻塞状态。
cpp
复制
void SafepointSynchronize::block(JavaThread* thread) { _wait_barrier->wait(active_safepoint_id); // FUTEX_WAIT }
- 在安全点开始时,线程通过
- 线程唤醒:
- GC 完成后,
disarm_safepoint()
调用LinuxWaitBarrier::disarm()
,通过FUTEX_WAKE
唤醒所有阻塞线程。
- GC 完成后,
总结
- 安全点进入:GC 开始时,所有线程通过
futex
进入阻塞状态。 - GC 执行:VM 线程在安全点内执行垃圾回收。
- 安全点退出:
- 更新全局状态和计数器。
- 重置每个线程的运行状态。
- 调用
futex
的FUTEX_WAKE
唤醒所有线程。
- 线程恢复:被唤醒的线程继续执行后续代码。
这些代码是 垃圾回收完成后解除线程阻塞的核心实现,通过操作系统提供的 futex
机制高效地管理线程的阻塞与唤醒。
##源码
void VMThread::inner_execute(VM_Operation* op) {
assert(Thread::current()->is_VM_thread(), "Must be the VM thread");
VM_Operation* prev_vm_operation = NULL;
if (_cur_vm_operation != NULL) {
// Check that the VM operation allows nested VM operation.
// This is normally not the case, e.g., the compiler
// does not allow nested scavenges or compiles.
if (!_cur_vm_operation->allow_nested_vm_operations()) {
fatal("Unexpected nested VM operation %s requested by operation %s",
op->name(), _cur_vm_operation->name());
}
op->set_calling_thread(_cur_vm_operation->calling_thread());
prev_vm_operation = _cur_vm_operation;
}
_cur_vm_operation = op;
HandleMark hm(VMThread::vm_thread());
EventMarkVMOperation em("Executing %sVM operation: %s", prev_vm_operation != NULL ? "nested " : "", op->name());
log_debug(vmthread)("Evaluating %s %s VM operation: %s",
prev_vm_operation != NULL ? "nested" : "",
_cur_vm_operation->evaluate_at_safepoint() ? "safepoint" : "non-safepoint",
_cur_vm_operation->name());
bool end_safepoint = false;
if (_cur_vm_operation->evaluate_at_safepoint() &&
!SafepointSynchronize::is_at_safepoint()) {
SafepointSynchronize::begin();
if (_timeout_task != NULL) {
_timeout_task->arm();
}
end_safepoint = true;
}
evaluate_operation(_cur_vm_operation);
if (end_safepoint) {
if (_timeout_task != NULL) {
_timeout_task->disarm();
}
SafepointSynchronize::end();
}
_cur_vm_operation = prev_vm_operation;
}
// Wake up all threads, so they are ready to resume execution after the safepoint
// operation has been carried out
void SafepointSynchronize::end() {
assert(Threads_lock->owned_by_self(), "must hold Threads_lock");
EventSafepointEnd event;
assert(Thread::current()->is_VM_thread(), "Only VM thread can execute a safepoint");
disarm_safepoint();
Universe::heap()->safepoint_synchronize_end();
SafepointTracing::end();
post_safepoint_end_event(event, safepoint_id());
}
void SafepointSynchronize::disarm_safepoint() {
uint64_t active_safepoint_counter = _safepoint_counter;
{
JavaThreadIteratorWithHandle jtiwh;
#ifdef ASSERT
// A pending_exception cannot be installed during a safepoint. The threads
// may install an async exception after they come back from a safepoint into
// pending_exception after they unblock. But that should happen later.
for (; JavaThread *cur = jtiwh.next(); ) {
assert (!(cur->has_pending_exception() &&
cur->safepoint_state()->is_at_poll_safepoint()),
"safepoint installed a pending exception");
}
#endif // ASSERT
OrderAccess::fence(); // keep read and write of _state from floating up
assert(_state == _synchronized, "must be synchronized before ending safepoint synchronization");
// Change state first to _not_synchronized.
// No threads should see _synchronized when running.
_state = _not_synchronized;
// Set the next dormant (even) safepoint id.
assert((_safepoint_counter & 0x1) == 1, "must be odd");
Atomic::release_store(&_safepoint_counter, _safepoint_counter + 1);
OrderAccess::fence(); // Keep the local state from floating up.
jtiwh.rewind();
for (; JavaThread *current = jtiwh.next(); ) {
// Clear the visited flag to ensure that the critical counts are collected properly.
DEBUG_ONLY(current->reset_visited_for_critical_count(active_safepoint_counter);)
ThreadSafepointState* cur_state = current->safepoint_state();
assert(!cur_state->is_running(), "Thread not suspended at safepoint");
cur_state->restart(); // TSS _running
assert(cur_state->is_running(), "safepoint state has not been reset");
}
} // ~JavaThreadIteratorWithHandle
// Release threads lock, so threads can be created/destroyed again.
Threads_lock->unlock();
// Wake threads after local state is correctly set.
_wait_barrier->disarm();
}
// Guarantees any thread that called wait() will be awake when it returns.
// Provides a trailing fence.
void disarm() {
assert(_owner == Thread::current(), "Not owner thread");
_impl.disarm();
}
// Guarantees any thread that called wait() will be awake when it returns.
// Provides a trailing fence.
void disarm() {
assert(_owner == Thread::current(), "Not owner thread");
_impl.disarm();
}
void LinuxWaitBarrier::disarm() {
assert(_futex_barrier != 0, "Should be armed/non-zero.");
_futex_barrier = 0;
int s = futex(&_futex_barrier,
FUTEX_WAKE_PRIVATE,
INT_MAX /* wake a max of this many threads */);
guarantee_with_errno(s > -1, "futex FUTEX_WAKE failed");
}
static int futex(volatile int *addr, int futex_op, int op_arg) {
return syscall(SYS_futex, addr, futex_op, op_arg, NULL, NULL, 0);
}