java多线程基础 java多线程
使用多线程的原因:
- CPU的多核: 一个线程在一个时刻只能运行在一个处理器核心上, 多线程可以提高对多核的利用率;
- I/O阻塞: I/O操作过多的程序, 单线程CPU利用率低;
- 为了更快的相应: 对数据一致性要求不强的操作交给其他线程处理, 响应更快;
- java具有良好的多线程编程模型;
多线程基础
1. 查看java程序中包含的线程
@Test
public void testThread(){
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false,false);
for(ThreadInfo info : threadInfos){
System.out.println(info.getThreadId() + " | " + info.getThreadName());
}
}
--- 输出:
6 | Monitor Ctrl-Break
5 | Attach Listener
4 | Signal Dispatcher
3 | Finalizer
2 | Reference Handler
1 | main
2. 优先级 守护线程
- 设置优先级
java线程中, 默认优先级为5, 优先级范围为1-10; 优先级设置建议: 对频繁阻塞(休眠或I/O)的线程设置较高的优先级, 而偏重计算的(需要较多CPU时间或者偏运算)的线程设置较低的优先级, 确保处理器不会被独占.
public final void setPriority(int newPriority) {
ThreadGroup g;
checkAccess();
if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) {
throw new IllegalArgumentException();
}
if((g = getThreadGroup()) != null) { // 线程组为空,则不修改线程优先级
if (newPriority > g.getMaxPriority()) {
newPriority = g.getMaxPriority();
}
setPriority0(priority = newPriority);
}
}
- 守护线程
Daemon线程主要用于程序中后台调度以及支持性工作, 例如gc线程, Finalizer线程;
jvm中没有非Daemon线程, 虚拟机就会退出, Daemon线程会立即终止;
@Test
public void testDaemon() throws InterruptedException {
Thread th1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Im is a Daemon Thread");
}
});
th1.setDaemon(true);
th1.start();
System.out.println(Thread.currentThread().getName() + " running");
Thread.sleep(1000L);
}
3. java线程状态
java线程运行生命周期中的6中状态:
状态名称 | 说明 |
---|---|
NEW | 初始状态, 线程被创建,还没有调用start()方法 |
RUNNABLE | 运行状态, java线程将操作系统中的就绪与运行统称为运行中 |
BLOCKED | 阻塞状态, 表示线程阻塞与锁, 例如获取锁后一直sleep |
WAITING | 等待状态, 表示当前线程需要等待其他线程做一些特定的动作(通知或中断) |
TIME_WAITING | 超时等待状态, 在指定时间内自行返回, 不再等待下去 |
TERMINATED | 终止状态, 表示线程已经执行完毕 |
4. 启动线程
- 构造线程
线程运行之前首先要构造一个线程对象, 线程对象在构造的时候需要提供线程所需要的属性;
线程初始化过程如下:
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
// 初始化线程属性: 线程组, 优先级, 是否Daemon
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name.toCharArray();
Thread parent = currentThread(); // 取当前线程为parent,
SecurityManager security = System.getSecurityManager(); // 获取java安全管理器
if (g == null) {
/* Determine if it's an applet or not */
/* If there is a security manager, ask the security manager
what to do. */
if (security != null) {
g = security.getThreadGroup();
}
/* If the security doesn't have a strong opinion of the matter
use the parent thread group. */
if (g == null) {
g = parent.getThreadGroup();
}
}
/* checkAccess regardless of whether or not threadgroup is
explicitly passed in. */
g.checkAccess();
/*
* Do we have the required permissions?
*/
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
g.addUnstarted();
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
if (parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}
由init()方法可知, 构造的线程对象是有其parent线程来进行空间分配的, 而child线程继承了parent(以当前线程作为新构造线程的parent)属性.
- 调用start()方法启动线程, start()方法的含义: 当前线程(即: parent线程)同步告知java虚拟机.
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();
5. 终止线程
- 使用过期的方法终止线程
不建议使用, 原因:
- suspend()方法在调用后, 线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠, 这样容易引起死锁;
- stop()方法终结一个线程是不会保证线程资源正常释放, 通常是没有给予线程完成资源释放的机会, 而导致程序可能工作在不确定状态下;
- 由于suspend()、resume()、 stop()方法带来的副作用, 不建议使用;
@Deprecated
public final void suspend() { // 暂停
checkAccess();
suspend0();
}
@Deprecated
public final void resume() { // 恢复
checkAccess();
resume0();
}
@Deprecated
public final void stop() { // 停止
stop(new ThreadDeath());
}
@Deprecated
public final synchronized void stop(Throwable obj) {
if (obj == null)
throw new NullPointerException();
SecurityManager security = System.getSecurityManager();
if (security != null) {
checkAccess();
if ((this != Thread.currentThread()) ||
(!(obj instanceof ThreadDeath))) {
security.checkPermission(SecurityConstants.STOP_THREAD_PERMISSION);
}
}
// A zero status value corresponds to "NEW", it can't change to
// not-NEW because we hold the lock.
if (threadStatus != 0) {
resume(); // Wake up thread if it was suspended; no-op otherwise
}
// The VM can handle all thread states
stop0(obj);
}
- 使用中断
中断相关的三个方法:
- interrupt(): 对线程设置中断标识, 通过抛出InterruptException异常中断线程;
- interrupted(): 返回当前线程中断状态,并将当前线程中断状态复位(置为false);
- isInterrupted(): 返回线程中断状态, 声明抛出InterruptException的方法在抛出InterruptException之前, jvm会先将该线程的中断标识位清楚, 然后抛出InterruptException, 此时调用isInterrupt()方法将返回false;
源码分析:
// 对线程设置中断标识, 即中断状态设置为true
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
// 返回当前线程中断状态,并将当前线程中断状态复位(即:设置为false)。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
// 返回线程的中断状态, 但并不对线程中断状态复位;
public boolean isInterrupted() {
return isInterrupted(false);
}
/**
* Tests if some Thread has been interrupted. The interrupted state
* is reset or not based on the value of ClearInterrupted that is
* passed.
*/
private native boolean isInterrupted(boolean ClearInterrupted);
代码示例:
public class ThreadTest {
@Test
public void testInterrupt() throws InterruptedException {
Thread th1 = new Thread(new SleepRunner());
th1.start();
Thread.sleep(2000L);
th1.interrupt();
System.out.println("th1 status | " + th1.isInterrupted());
Thread th2 = new Thread(new BusyRunner());
th2.start();
th2.interrupt();
System.out.println("th2 status | " + th2.isInterrupted());
th2.join();
System.out.println("执行结束");
}
static class SleepRunner implements Runnable{
@Override
public void run() {
try{
while(true) {
Thread.sleep(1000);
}
}catch (Exception e){
System.out.println("throw Exception");
}
}
}
static class BusyRunner implements Runnable{
@Override
public void run() {
while(true){
if(Thread.currentThread().isInterrupted()){
return;
}
}
}
}
}
-- 输出结果:
throw Exception
th1 status | false // 中断标识重置了
th2 status | true // 中断标识未重置
执行结束
-
安全终止线程
利用一个boolean变量控制是否需要体质任务并进行中断操作和停止操作。
代码示例:
public class SecurityInterruptThread {
@Test
public void testSecurityInterruptThread() throws InterruptedException {
Thread th1 = new Thread(new Runner(), "th1");
Runner run = null;
Thread th2 = new Thread(run = new Runner(), "th2");
th1.start();
th2.start();
Thread.sleep(1000);
th1.interrupt();
run.cancel();
th1.join();
th2.join();
}
static class Runner implements Runnable{
private long i;
private volatile boolean on = true;
@Override
public void run() {
// 通过中断 标识 和 on 来中断线程
while(on && !Thread.currentThread().isInterrupted()){
i++;
}
System.out.println("count i = " + i);
}
public void cancel(){
on = false; // on置为false, 中断线程
}
}
}
6. 线程间通信
java线程间通信方式:
- 等待/通知机制(wait/notify)
- 管道输入/输出流(PipedReader/PipedWriter)
- Thread.join() 等待线程结束
- ThreadLocal 线程变量
1. 等待/通知机制
- notify(): 随机通知通一个wait在该锁对象的线程;
- notifyAll(): 通知所有wait在该所对象上的线程;
- wait(): 锁对象调用此方法, 当前线程进入WAITING状态, 只有被其他线程唤醒或者被中断才会返回, 调用wait()会释放锁对象.
- wait(long): 超时等待一段时间,单位毫秒; 超时返回;
- wait(long, int): 可以精确到纳秒;
注意: 锁对象调用notify() 或者notifyAll() 方法只是唤醒等待该所对象的线程, 但是被唤醒的线程需要等到所锁对象被释放后并且抢到锁对象才能执行.
WaitThread先获得对象锁, 调用对象锁wait方法释放对象锁, 进入等待队列, 处于WAITING状态; NotifyThread等到WaitThread释放了对象锁后获取对象锁, 调用对象锁notify()或者notifyAll(), WaitThread线程从等待队列进入到同步队列, 变成BLOCKED状态。NotifyThread释放锁之后, WaitThread再次获取锁并从wait(0方法返回继续执行。
代码示例
public class WaitNotifyTest {
static Object lock = new Object();
static boolean flag = true;
public static void main(String[] args) throws InterruptedException {
Thread th1 = new Thread(new Wait(), "WaitThread");
Thread th2 = new Thread(new Notify(), "NotifyThread");
th1.start();
Thread.sleep(1000L);
th2.start();
}
static class Wait implements Runnable{
@Override
public void run() {
synchronized (lock){
while(flag){
try {
System.out.println(Thread.currentThread().getName() + ": to Wait");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ": Wait end");
}
}
}
static class Notify implements Runnable{
@Override
public void run() {
synchronized (lock){
System.out.println(Thread.currentThread().getName() + ": to Notify");
lock.notifyAll();
flag = false;
System.out.println(Thread.currentThread().getName() + ": Notify end");
}
}
}
}
-- 输出:
WaitThread: to Wait
NotifyThread: to Notify
NotifyThread: Notify end
WaitThread: Wait end
2. 管道输入/输出流
管道输入/输出流 与 普通文件输入输出 或者 网络输入输出流的区别在于, 它主要用于线程之间的数据传输, 而传输媒介为内存.
管道输入输出主要4中实现如下:
- PipedOutputStream
- PipedInputStream
- PipedReader
- PipedWriter
前两种面向字节, 后两种面向字符
代码示例
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流与输入流进行连接, 否则使用时会抛出IOException
out.connect(in);
Thread th1 = new Thread(new Print(in), "PrintThread");
th1.start();
int receive = 0;
try{
while((receive = System.in.read()) != -1){
out.write(receive);
}
}finally {
out.close();
}
}
static class Print implements Runnable{
private PipedReader in;
public Print(PipedReader in){
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while((receive = in.read()) != -1){
System.out.print((char) receive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
-- 示例输入: 控制台输入 hello world, th1线程输出 hello world
hello world
hello world
3. join()
join等待线程结束后才向下执行; 源码内部通过一个native方法isAlive()判断线程是否还存活, 根据isAlive()源码上的解释可知, 只有当线程已经被启动且还没有结束, 才会被认为处于存活状态; 因此当线程还未启动,调用join是不会有作用的,会直接向下运行;
join内部使用wait(0)实现, 因为调用join方法的为当前线程, 所以让当前线程等待.
join源码
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
/**
* Tests if this thread is alive. A thread is alive if it has
* been started and has not yet died.
*
* @return <code>true</code> if this thread is alive;
* <code>false</code> otherwise.
*/
public final native boolean isAlive();
代码示例: 创建5个线程, 每个线程等待前一个线程执行完后才执行;
public class JoinTest {
public static void main(String[] args) {
Thread previous = Thread.currentThread();
for(int i = 0; i < 5; i++){
Thread cur = new Thread(new Domino(previous), "thread-" + String.valueOf(i));
cur.start();
previous = cur;
}
System.out.println(Thread.currentThread().getName() + " terminated");
}
static class Domino implements Runnable{
private Thread thread;
public Domino(Thread thread){
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " terminated");
}
}
}
---输出:
main terminated
thread-0 terminated
thread-1 terminated
thread-2 terminated
thread-3 terminated
thread-4 terminated
以当前线程对象为锁, 测试调用join()
结果存疑??? 待以后解决
- 可能原因, join内部使用wait(0) , 释放锁资源后, 会唤醒等待的线程, 然后竞争锁资源。(不确定, 个人猜测)
/**
* TestJoin中以当前线程对象为锁, 测试调用join()
* 测试消耗时间为3011左右, 结果存疑??
* @throws InterruptedException [description]
*/
@Test
public void testJoin() throws InterruptedException {
long start = System.currentTimeMillis();
Thread th1 = new Thread(new TestJoin(), "th1");
th1.start();
//Thread.sleep(100L);
th1.join(1000L);
/*synchronized (th1){
System.out.println(System.currentTimeMillis() - start);
System.out.println(Thread.currentThread().getName() + " th1 wait");
th1.wait(1000L);
}*/
System.out.println(System.currentTimeMillis() - start);
}
static class TestJoin implements Runnable{
@Override
public void run() {
synchronized (Thread.currentThread()){
//synchronized (TestJoin.class){
try {
System.out.println(Thread.currentThread().getName() + " sleep");
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " end");
}
}
}
-- 测试结果:
th1 sleep
th1 end
3011
4. ThreadLocal的使用
- ThreadLocal为线程变量, Thread对象存在一个ThreadLocalMap变量, 以ThreadLocal为key, 任意对象为值;
ThreadLocal 源码分析
/**
* 获取当前线程的ThreadLocalMap变量, 然后以ThreadLocal对象为键, 获取对应的value值
* 若当前线程ThreadLocalMap变量为空, 则为其创建一个map, 并返回initialValue()的值, 默认为null
* @return [description]
*/
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); // 获取当前线程的ThreadLocalMap变量
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
/**
* 获取当前线程ThreadLocalMap变量, 以ThreadLocal对象为键, value为值, 添加到map中
* 若ThreadLocalMap为空, 则调用createMap为当前线程ThreadLocalMap赋值,并将数据添加进去
* @param value [description]
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
/**
* 移除以ThreadLocal对象为键的entry
*/
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
private T setInitialValue() {
T value = initialValue(); // 默认返回null, 可以通过子类重写返回自定义默认值
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
- ThreadLocal内存泄漏问题
/**
* ThreadLocal内部维护一个ThreadLocalMap内部类
*/
static class ThreadLocalMap {
/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
由ThreadLocalMap源码可知, entry节点继承WeakReference<ThreadLocal<?>>, 键ThrealLocal为弱引用, 在下次gc时就会被回收掉; 因此ThreadLocalMap中就会出现key为null的Entry, 无法访问key为null的Entry中的value, 而当前线程声明周期很长时, 就有可能会造成内存泄漏.
key使用弱引用而不用强引用的原因:
- key若使用强引用: 引用的ThreadLocal的对象可以被回收时,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏。
- key若使用弱引用: 引用的ThreadLocal的对象可以被回收时,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。
由于ThreadLocalMap
的生命周期跟Thread
一样长,如果都没有手动删除对应key
,都会导致内存泄漏,但是使用弱引用可以多一层保障:弱引用ThreadLocal不会内存泄漏,对应的value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。
ThreadLocal 防止内存泄漏:
- 每次使用完ThreadLocal后, 都调用它的remove() 方法, 清楚数据;
在使用线程池的情况下,没有及时清理ThreadLocal,不仅是内存泄漏的问题,更严重的是可能导致业务逻辑出现问题。所以,使用ThreadLocal就跟加锁完要解锁一样,用完就清理。