CompletableFuture
Detailní dokumentace k asynchronním operacím pomocí CompletableFuture v Hytale.
---
Přehled
CompletableFuture
│
├── runAsync() - Void operace na ForkJoinPool
├── supplyAsync() - Vrací hodnotu
│
├── thenApply() - Transformace výsledku
├── thenAccept() - Konzumace výsledku
├── thenRun() - Akce po dokončení
│
├── exceptionally() - Error handling
└── whenComplete() - Finální callback
---
CompletableFutureUtil
Hytale utility třída pro práci s futures:
public class CompletableFutureUtil { // Globální exception handler
public static final Function fn = throwable -> {
if (!(throwable instanceof TailedRuntimeException)) {
HytaleLogger.getLogger().at(Level.SEVERE)
.withCause(throwable)
.log("Unhandled exception! " + Thread.currentThread());
}
throw new TailedRuntimeException(throwable);
};
// Propojení dvou futures
@Nonnull
public static CompletableFuture whenComplete(
@Nonnull CompletableFuture future,
@Nonnull CompletableFuture callee
) {
return future.whenComplete((result, throwable) -> {
if (throwable != null) {
callee.completeExceptionally(throwable);
} else {
callee.complete(result);
}
});
}
// Kontrola zda je future zrušeno
public static boolean isCanceled(Throwable throwable) {
return throwable instanceof CancellationException
|| (throwable instanceof CompletionException
&& throwable.getCause() != null
&& throwable.getCause() != throwable
&& isCanceled(throwable.getCause()));
}
// Automatický exception logging
@Nonnull
public static CompletableFuture _catch(@Nonnull CompletableFuture future) {
return future.exceptionally((Function)fn);
}
// Vytvoření zrušeného future
@Nonnull
public static CompletableFuture completionCanceled() {
CompletableFuture out = new CompletableFuture<>();
out.cancel(false);
return out;
}
}
---
joinWithProgress
Čekání na více futures s progress callback:
public static void joinWithProgress(
@Nonnull List> list,
@Nonnull ProgressConsumer callback,
int millisSleep,
int millisProgress
) throws InterruptedException {
CompletableFuture> all = CompletableFuture.allOf(
list.toArray(CompletableFuture[]::new)
); long last = System.nanoTime();
long nanosProgress = TimeUnit.MILLISECONDS.toNanos(millisProgress);
int listSize = list.size();
while (!all.isDone()) {
Thread.sleep(millisSleep);
long now;
if (last + nanosProgress < (now = System.nanoTime())) {
last = now;
int done = 0;
for (CompletableFuture c : list) {
if (c.isDone()) {
done++;
}
}
if (done < listSize) {
callback.accept((double)done / (double)listSize, done, listSize);
}
}
}
callback.accept(1.0, listSize, listSize);
all.join();
}
@FunctionalInterface
public interface ProgressConsumer {
void accept(double progress, int done, int total);
}
Příklad Použití
List> tasks = new ArrayList<>();
for (Player player : players) {
tasks.add(CompletableFuture.runAsync(() -> savePlayer(player)));
}CompletableFutureUtil.joinWithProgress(
tasks,
(progress, done, total) -> {
getLogger().info(String.format("Saving: %.0f%% (%d/%d)", progress * 100, done, total));
},
100, // Check every 100ms
1000 // Log every 1000ms
);
---
Základní Vzory
Jednorázový Async Task
CompletableFuture.runAsync(() -> {
// Těžká práce (I/O, databáze)
saveToDatabase(data);
});
S Návratovou Hodnotou
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return loadFromDatabase(uuid);
});// Později...
PlayerData data = future.join(); // Blokující
// nebo
future.thenAccept(data -> useData(data)); // Non-blokující
Řetězení Operací
CompletableFuture
.supplyAsync(() -> {
// Krok 1: Načti data
return loadRawData();
})
.thenApply(raw -> {
// Krok 2: Parsuj
return parseData(raw);
})
.thenApply(parsed -> {
// Krok 3: Validuj
return validateData(parsed);
})
.thenAccept(valid -> {
// Krok 4: Použij
world.execute(() -> applyData(valid));
})
.exceptionally(error -> {
getLogger().at(Level.SEVERE).withCause(error).log("Pipeline failed");
return null;
});
---
World Thread Integration
Async → World Thread
CompletableFuture.supplyAsync(() -> {
// Na async thread - I/O operace
return loadPlayerData(uuid);
}).thenAccept(data -> {
// POZOR: Stále na async thread! // Přepni na world thread pro komponenty
world.execute(() -> {
Ref ref = playerRef.getReference();
if (ref != null && ref.isValid()) {
Store store = ref.getStore();
// Bezpečný přístup ke komponentám
applyDataToPlayer(ref, store, data);
}
});
});
Čekání na World Operaci
public CompletableFuture performWorldOperation(World world, Runnable operation) {
CompletableFuture future = new CompletableFuture<>(); world.execute(() -> {
try {
operation.run();
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
---
Kombinování Futures
allOf - Všechny musí dokončit
List> saveFutures = new ArrayList<>();for (Player player : Universe.get().getPlayers()) {
CompletableFuture save = CompletableFuture.runAsync(() -> {
savePlayer(player.getUuid());
});
saveFutures.add(save);
}
// Počkej na všechny
CompletableFuture.allOf(saveFutures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
getLogger().info("All players saved!");
});
anyOf - První dokončený
CompletableFuture---
Error Handling
exceptionally
CompletableFuture
.supplyAsync(() -> riskyOperation())
.exceptionally(error -> {
getLogger().at(Level.WARNING).withCause(error).log("Operation failed");
return defaultValue; // Fallback
});
handle (Success + Error)
CompletableFuture
.supplyAsync(() -> loadData())
.handle((result, error) -> {
if (error != null) {
getLogger().at(Level.WARNING).withCause(error).log("Load failed");
return defaultData;
}
return result;
});
whenComplete (Nepřepisuje výsledek)
CompletableFuture
.supplyAsync(() -> loadData())
.whenComplete((result, error) -> {
if (error != null) {
// Loguj ale nepřepisuj
getLogger().at(Level.SEVERE).withCause(error).log("Error occurred");
} else {
getLogger().info("Loaded: " + result);
}
});
S CompletableFutureUtil
// Automatický logging všech výjimek
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return loadData();
});CompletableFutureUtil._catch(future); // Přidá exception handler
---
Timeout
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return slowOperation();
});try {
Data result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
// Fallback
}
S orTimeout (Java 9+)
CompletableFuture future = CompletableFuture
.supplyAsync(() -> slowOperation())
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(error -> {
if (error instanceof TimeoutException) {
return defaultData;
}
throw new CompletionException(error);
});
---
Příklady z Hytale
BackupTask Pattern
public class BackupTask {
@Nonnull
private final CompletableFuture completion = new CompletableFuture<>(); public static CompletableFuture start(@Nonnull Path universeDir, @Nonnull Path backupDir) {
BackupTask task = new BackupTask(universeDir, backupDir);
return task.completion;
}
private BackupTask(@Nonnull Path universeDir, @Nonnull Path backupDir) {
new Thread("Backup Runner") {
{
this.setDaemon(false);
}
@Override
public void run() {
BackupUtil.broadcastBackupStatus(true);
try {
// Vytvoř backup
Path backupZip = createBackup(universeDir, backupDir);
LOGGER.at(Level.INFO).log("Successfully created backup %s", backupZip);
completion.complete(null);
} catch (Throwable e) {
LOGGER.at(Level.SEVERE).withCause(e).log("Backup failed");
BackupUtil.broadcastBackupError(e);
completion.completeExceptionally(e);
} finally {
BackupUtil.broadcastBackupStatus(false);
}
}
}.start();
}
}
// Použití
BackupTask.start(universePath, backupPath)
.thenRun(() -> getLogger().info("Backup complete!"))
.exceptionally(e -> {
getLogger().severe("Backup failed: " + e.getMessage());
return null;
});
Config Loading
// Z PluginBase
@Nullable
public CompletableFuture preLoad() {
if (this.configs.isEmpty()) {
return null;
} CompletableFuture>[] futures = new CompletableFuture[this.configs.size()];
for (int i = 0; i < this.configs.size(); i++) {
futures[i] = this.configs.get(i).load();
}
return CompletableFuture.allOf(futures);
}
---
TaskRegistry Integration
Registrace futures pro automatické zrušení při shutdown:
@Override
protected void setup() {
// Registruj long-running task
CompletableFuture task = CompletableFuture.runAsync(() -> {
while (!Thread.currentThread().isInterrupted()) {
performPeriodicWork();
Thread.sleep(60000);
}
}); // Registrace zajistí cancel() při plugin shutdown
getTaskRegistry().registerTask(task);
}
---
Best Practices
1. Vždy Ošetři Výjimky
// Špatně - výjimka se ztratí
CompletableFuture.runAsync(() -> riskyOperation());// Správně
CompletableFuture.runAsync(() -> riskyOperation())
.exceptionally(e -> {
getLogger().at(Level.SEVERE).withCause(e).log("Operation failed");
return null;
});
2. World Thread pro Komponenty
// Špatně - přístup ke komponentám z async
CompletableFuture.runAsync(() -> {
store.getComponent(ref, Type.getComponentType()); // CRASH!
});// Správně
CompletableFuture.runAsync(() -> {
Data data = loadData();
world.execute(() -> {
store.getComponent(ref, Type.getComponentType()); // OK
});
});
3. Neblokuj na World Thread
// Špatně - blokuje world thread
world.execute(() -> {
future.join(); // NIKDY!
});// Správně - callback
future.thenAccept(result -> {
world.execute(() -> useResult(result));
});
---
Shrnutí
| Metoda | Účel |
|--------|------|
| runAsync() | Void async operace |
| supplyAsync() | Async s návratovou hodnotou |
| thenApply() | Transformace výsledku |
| thenAccept() | Konzumace výsledku |
| thenRun() | Akce po dokončení |
| exceptionally() | Error handling |
| handle() | Success + Error |
| whenComplete() | Finální callback |
| allOf() | Čekání na všechny |
| anyOf() | První dokončený |
| CompletableFutureUtil | Účel |
|----------------------|------|
| _catch() | Automatický exception logging |
| whenComplete() | Propojení dvou futures |
| isCanceled() | Kontrola zrušení |
| completionCanceled() | Vytvoření zrušeného future |
| joinWithProgress() | Čekání s progress callback |