Multithreading trong Java: Lập Trình Đa Luồng Hiệu Quả

Multithreading là gì?

Multithreading là khả năng thực thi nhiều threads (luồng) đồng thời trong một chương trình. Mỗi thread là một đơn vị thực thi độc lập, giúp tận dụng tối đa CPU và cải thiện performance.

Tại sao cần Multithreading?

// ❌ Single thread - chậm
downloadFile("file1.zip");  // 5 giây
downloadFile("file2.zip");  // 5 giây
downloadFile("file3.zip");  // 5 giây
// Tổng: 15 giây

// ✅ Multi threads - nhanh
Thread t1 = new Thread(() -> downloadFile("file1.zip"));
Thread t2 = new Thread(() -> downloadFile("file2.zip"));
Thread t3 = new Thread(() -> downloadFile("file3.zip"));
t1.start(); t2.start(); t3.start();
// Tổng: ~5 giây (download song song)

Cách tạo Thread

1. Extend Thread class

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread đang chạy: " + Thread.currentThread().getName());
        for (int i = 0; i < 5; i++) {
            System.out.println(i);
            try {
                Thread.sleep(1000); // Sleep 1 giây
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// Sử dụng
MyThread thread = new MyThread();
thread.start(); // Gọi start(), KHÔNG phải run()

2. Implement Runnable interface (Khuyên dùng)

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable đang chạy");
    }
}

// Sử dụng
Thread thread = new Thread(new MyRunnable());
thread.start();

// Hoặc dùng lambda (Java 8+)
Thread thread = new Thread(() -> {
    System.out.println("Lambda runnable");
});
thread.start();

Tại sao ưu tiên Runnable?

  • Java không hỗ trợ multiple inheritance
  • Tách logic khỏi thread management
  • Dễ test hơn

3. Callable và Future

Trả về kết quả và có thể throw exception:

import java.util.concurrent.*;

Callable<Integer> task = () -> {
    Thread.sleep(2000);
    return 42;
};

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(task);

// Làm việc khác...

try {
    Integer result = future.get(); // Chờ kết quả
    System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

executor.shutdown();

Thread Lifecycle

NEW → RUNNABLE → RUNNING → TERMINATED
       ↓           ↑
       ← BLOCKED ←
       ← WAITING ←
       ← TIMED_WAITING ←
Thread thread = new Thread(() -> {});

// NEW
System.out.println(thread.getState()); // NEW

thread.start();
// RUNNABLE

thread.join(); // Chờ thread kết thúc
// TERMINATED

Thread Methods

join()

Chờ thread kết thúc:

Thread t1 = new Thread(() -> {
    try {
        Thread.sleep(2000);
        System.out.println("T1 done");
    } catch (InterruptedException e) {}
});

t1.start();
t1.join(); // Main thread chờ t1 xong
System.out.println("Main continues");

sleep()

Tạm dừng thread:

try {
    Thread.sleep(2000); // Sleep 2 giây
} catch (InterruptedException e) {
    e.printStackTrace();
}

interrupt()

Gửi tín hiệu interrupt:

Thread thread = new Thread(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        System.out.println("Working...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // Restore interrupt status
            break;
        }
    }
});

thread.start();
Thread.sleep(3000);
thread.interrupt(); // Dừng thread

Synchronization

Vấn đề: Race Condition

class Counter {
    private int count = 0;
    
    public void increment() {
        count++; // KHÔNG thread-safe!
    }
    
    public int getCount() {
        return count;
    }
}

// Race condition
Counter counter = new Counter();
Thread t1 = new Thread(() -> {
    for (int i = 0; i < 1000; i++) counter.increment();
});
Thread t2 = new Thread(() -> {
    for (int i = 0; i < 1000; i++) counter.increment();
});

t1.start(); t2.start();
t1.join(); t2.join();

System.out.println(counter.getCount()); // Có thể < 2000!

Giải pháp 1: synchronized method

class Counter {
    private int count = 0;
    
    public synchronized void increment() {
        count++; // Thread-safe
    }
    
    public synchronized int getCount() {
        return count;
    }
}

Giải pháp 2: synchronized block

class Counter {
    private int count = 0;
    private final Object lock = new Object();
    
    public void increment() {
        synchronized (lock) {
            count++;
        }
    }
}

Giải pháp 3: Atomic classes

import java.util.concurrent.atomic.AtomicInteger;

class Counter {
    private AtomicInteger count = new AtomicInteger(0);
    
    public void increment() {
        count.incrementAndGet(); // Thread-safe, không cần synchronized
    }
    
    public int getCount() {
        return count.get();
    }
}

Deadlock

Hai threads chờ nhau mãi mãi:

// ❌ Deadlock example
Object lock1 = new Object();
Object lock2 = new Object();

Thread t1 = new Thread(() -> {
    synchronized (lock1) {
        System.out.println("T1: holding lock1");
        try { Thread.sleep(100); } catch (InterruptedException e) {}
        
        synchronized (lock2) { // Chờ lock2
            System.out.println("T1: holding lock1 & lock2");
        }
    }
});

Thread t2 = new Thread(() -> {
    synchronized (lock2) {
        System.out.println("T2: holding lock2");
        try { Thread.sleep(100); } catch (InterruptedException e) {}
        
        synchronized (lock1) { // Chờ lock1
            System.out.println("T2: holding lock1 & lock2");
        }
    }
});

t1.start();
t2.start();
// DEADLOCK! Cả 2 threads đều bị block

Tránh deadlock: Lock theo thứ tự nhất quán:

// ✅ Luôn lock theo thứ tự: lock1 → lock2
synchronized (lock1) {
    synchronized (lock2) {
        // Safe
    }
}

ExecutorService

Framework quản lý thread pool hiệu quả:

Fixed Thread Pool

ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {
    final int taskId = i;
    executor.submit(() -> {
        System.out.println("Task " + taskId + " by " + 
            Thread.currentThread().getName());
    });
}

executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

Cached Thread Pool

Tạo threads on-demand:

ExecutorService executor = Executors.newCachedThreadPool();
// Tự động tạo/tái sử dụng threads

Single Thread Executor

Đảm bảo tasks chạy tuần tự:

ExecutorService executor = Executors.newSingleThreadExecutor();

Scheduled Thread Pool

Chạy tasks định kỳ:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// Chạy sau 2 giây
scheduler.schedule(() -> {
    System.out.println("Executed after 2s");
}, 2, TimeUnit.SECONDS);

// Chạy mỗi 1 giây
scheduler.scheduleAtFixedRate(() -> {
    System.out.println("Tick");
}, 0, 1, TimeUnit.SECONDS);

Concurrent Collections

Thread-safe collections:

// ConcurrentHashMap
Map<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);

// CopyOnWriteArrayList
List<String> list = new CopyOnWriteArrayList<>();
list.add("item");

// BlockingQueue
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
queue.put("item"); // Block nếu full
String item = queue.take(); // Block nếu empty

CompletableFuture (Java 8+)

Async programming hiện đại:

// Async task
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {}
    return "Result";
});

// Chain operations
future.thenApply(result -> result.toUpperCase())
      .thenAccept(result -> System.out.println(result))
      .exceptionally(ex -> {
          System.err.println("Error: " + ex);
          return null;
      });

// Combine multiple futures
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);

CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> a + b);
System.out.println(combined.get()); // 30

Ví dụ thực tế

1. Download nhiều files song song

public class ParallelDownloader {
    private final ExecutorService executor = Executors.newFixedThreadPool(5);
    
    public void downloadFiles(List<String> urls) {
        List<Future<String>> futures = new ArrayList<>();
        
        for (String url : urls) {
            Future<String> future = executor.submit(() -> downloadFile(url));
            futures.add(future);
        }
        
        // Chờ tất cả download xong
        for (Future<String> future : futures) {
            try {
                String result = future.get();
                System.out.println("Downloaded: " + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        executor.shutdown();
    }
    
    private String downloadFile(String url) throws InterruptedException {
        Thread.sleep(2000); // Simulate download
        return url;
    }
}

2. Producer-Consumer với BlockingQueue

public class ProducerConsumer {
    private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 20; i++) {
                    System.out.println("Producing: " + i);
                    queue.put(i); // Block nếu queue full
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {}
        }
    }
    
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take(); // Block nếu queue empty
                    System.out.println("Consuming: " + item);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {}
        }
    }
    
    public void start() {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}

3. Parallel Stream

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Sequential
int sum = numbers.stream()
    .map(n -> n * 2)
    .reduce(0, Integer::sum);

// Parallel
int parallelSum = numbers.parallelStream()
    .map(n -> n * 2)
    .reduce(0, Integer::sum);

4. Cache với Double-Checked Locking

public class Singleton {
    private static volatile Singleton instance;
    
    private Singleton() {}
    
    public static Singleton getInstance() {
        if (instance == null) { // First check (no locking)
            synchronized (Singleton.class) {
                if (instance == null) { // Second check (with locking)
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

Best Practices

  1. Dùng ExecutorService thay vì tạo Thread thủ công
// ❌ BAD
new Thread(() -> doWork()).start();

// ✅ GOOD
executor.submit(() -> doWork());
  1. Luôn shutdown ExecutorService
try {
    // Submit tasks
} finally {
    executor.shutdown();
    executor.awaitTermination(60, TimeUnit.SECONDS);
}
  1. Dùng concurrent collections
// ✅ Thread-safe
Map<String, Integer> map = new ConcurrentHashMap<>();

// ❌ Không thread-safe
Map<String, Integer> map = new HashMap<>();
  1. Tránh synchronized quá mức
// ❌ Lock toàn bộ method
public synchronized void doWork() {
    // Expensive operation
}

// ✅ Lock chỉ phần cần thiết
public void doWork() {
    // Non-critical code
    synchronized (lock) {
        // Critical section only
    }
}
  1. Sử dụng volatile cho flags
private volatile boolean running = true;

public void stop() {
    running = false; // Visible to all threads
}

Kết luận

Multithreading trong Java:

  • Tăng performance với parallel processing
  • Cần cẩn thận về race conditions và deadlocks
  • ExecutorService giúp quản lý threads hiệu quả
  • Concurrent collections cho thread-safety
  • CompletableFuture cho async programming hiện đại

Nắm vững multithreading giúp bạn xây dựng ứng dụng Java scalable và hiệu suất cao!