Exchanger در جاوا – به زبان ساده
در این مطلب به بررسی <java.util.concurrent.Exchanger<T خواهیم پرداخت. این ساختار به منزله نقطه مشترکی بین دو نخ (Thread) جاوا برای مبادله اشیا بین آنها عمل میکند. برای آشنایی با Exchanger در جاوا تا انتهای این راهنما با ما همراه باشید.
آشنایی با Exchanger در جاوا
کلاس Exchanger در جاوا میتواند برای اشترک اشیا بین دو نخ از نوع T استفاده شود. این کلاس تنها یک متد منفرد overload-شده به نام exchange(T t) ارائه میکند. زمانی که این متد فراخوانی شود، منتظر نخ دیگر میماند تا آن نیز فراخوانی شود. در این نقطه نخ دوم متوجه میشود که نخ اول با شیء آن منتظر مانده است. این نخ اشیایی را که نگهداری میکند، مبادله کرده و سیگنالی برای مبادله ارسال میکند تا بتوانند بازگشت یابند.
در ادامه مثالی از این موضوع را مشاهده میکنید که به درک بهتر کاربرد پیام مبادله بین دو نخ با استفاده از Exchanger کمک میکند:
1@Test
2public void givenThreads_whenMessageExchanged_thenCorrect() {
3 Exchanger<String> exchanger = new Exchanger<>();
4
5 Runnable taskA = () -> {
6 try {
7 String message = exchanger.exchange("from A");
8 assertEquals("from B", message);
9 } catch (InterruptedException e) {
10 Thread.currentThread.interrupt();
11 throw new RuntimeException(e);
12 }
13 };
14
15 Runnable taskB = () -> {
16 try {
17 String message = exchanger.exchange("from B");
18 assertEquals("from A", message);
19 } catch (InterruptedException e) {
20 Thread.currentThread.interrupt();
21 throw new RuntimeException(e);
22 }
23 };
24 CompletableFuture.allOf(
25 runAsync(taskA), runAsync(taskB)).join();
26}
در مثال فوق، دو نخ داریم که پیامها را بین همدیگر با استفاده از exchanger مشترک مبادله میکنند. در ادامه مثال دیگری را میبینید که در آن یک شیء را از نخ اصلی با یک نخ جدید مبادله میکنیم:
1@Test
2public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException {
3 Exchanger<String> exchanger = new Exchanger<>();
4
5 Runnable runner = () -> {
6 try {
7 String message = exchanger.exchange("from runner");
8 assertEquals("to runner", message);
9 } catch (InterruptedException e) {
10 Thread.currentThread.interrupt();
11 throw new RuntimeException(e);
12 }
13 };
14 CompletableFuture<Void> result
15 = CompletableFuture.runAsync(runner);
16 String msg = exchanger.exchange("to runner");
17 assertEquals("from runner", msg);
18 result.join();
19}
توجه کنید که ابتدا باید نخ runner را اجرا کنیم و سپس ()exchange را در نخ اصلی فراخوانی نماییم.
همچنین توجه کنید در صورتی که نخ دوم در زمان مورد نظر به نقطه تبادل نرسد، فراخوانی نخ اول ممکن است timeout شود. این که نخ اول باید چه قدر صبر کند، با استفاده از متد overload-شده exchange(T t, long timeout, TimeUnit timeUnit) تنظیم میشود.
مبادله داده بدون GC
Exchanger میتواند برای ایجاد الگوهای شبیه pipeline با ارسال داده از یک نخ به نخ دیگر استفاده شود. در این بخش یک پشته ساده از نخها ایجاد میکنیم که به صورت پیوسته دادهها را بین همدیگر به صورت یک pipeline ارسال میکنند.
1@Test
2public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException {
3
4 Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
5 Exchanger<Queue<String>> writerExchanger = new Exchanger<>();
6
7 Runnable reader = () -> {
8 Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
9 while (true) {
10 readerBuffer.add(UUID.randomUUID().toString());
11 if (readerBuffer.size() >= BUFFER_SIZE) {
12 readerBuffer = readerExchanger.exchange(readerBuffer);
13 }
14 }
15 };
16
17 Runnable processor = () -> {
18 Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
19 Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
20 processorBuffer = readerExchanger.exchange(processorBuffer);
21 while (true) {
22 writerBuffer.add(processorBuffer.poll());
23 if (processorBuffer.isEmpty()) {
24 processorBuffer = readerExchanger.exchange(processorBuffer);
25 writerBuffer = writerExchanger.exchange(writerBuffer);
26 }
27 }
28 };
29
30 Runnable writer = () -> {
31 Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
32 writerBuffer = writerExchanger.exchange(writerBuffer);
33 while (true) {
34 System.out.println(writerBuffer.poll());
35 if (writerBuffer.isEmpty()) {
36 writerBuffer = writerExchanger.exchange(writerBuffer);
37 }
38 }
39 };
40 CompletableFuture.allOf(
41 runAsync(reader),
42 runAsync(processor),
43 runAsync(writer)).join();
44}
در مثال فوق، سه نخ به نامهای reader، processor و writer داریم. این سه نخ در مجموع به صورت یک pipeline منفرد عمل میکند که دادهها را بین خود مبادله میکنند. readerExchanger بین نخهای reader و processor مشترک است، در حالی که writerExchanger بین نخهای processor و writer به اشتراک درآمده است.
توجه کنید که این مثال، تنها به منظور تفهیم مطلب ارائه شده است. در عمل باید در مورد ایجاد حلقههای نامتناهی با while(true) مراقب باشیم. همچنین برای این که کد خوانایی بنویسیم، از مدیریت برخی استثناها خودداری کردهایم. این الگو برای مبادله دادهها در عین استفاده مجدد از بافر به ما امکان میدهد که Garbage Collection کمتری داشته باشیم. متد exchange وهلههای صف یکسانی را بازگشت میدهد و از این رو میتوان برای این اشیا از GC استفاده نکرد. برخلاف دیگر صفهای مسدودسازی، exchanger هیچ گروه یا شیئی برای نگهداری و اشتراک دادهها ایجاد نمیکند.
ایجاد یک چنین pipeline مشابه الگوی Disrupter است و تنها یک تفاوت کوچک به این صورت دارد که الگوی Disrupter از چند تولیدکننده و مصرفکننده پشتیبانی میکند، در حالی که exchanger میتواند بین یک جفت از تولیدکنندهها و مصرفکنندهها مورد استفاده قرار گیرد.
سخن پایانی
در این مقاله با <Exchanger<T در جاوا آشنا شدیم و طرز کار آن را شناختیم و شیوه استفاده از کلاس Exchanger را نیز در عمل مشاهده کردیم. همچنین یک pipeline ساخته و مبادله داده بدون GC را بین نخها آزمودیم.