package com.example.demo.controller;
import com.example.demo.model.User;
import com.example.demo.service.ReactiveUserService;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@RestController
@RequestMapping("/api/reactive/users")
public class ReactiveUserController {
private final ReactiveUserService userService;
public ReactiveUserController(ReactiveUserService userService) {
this.userService = userService;
}
@GetMapping("/{id}")
public Mono<User> getUserById(@PathVariable Long id) {
return userService.findById(id)
.timeout(Duration.ofSeconds(5))
.onErrorResume(e -> Mono.empty());
}
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll()
.delayElements(Duration.ofMillis(100));
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.findAll()
.delayElements(Duration.ofSeconds(1));
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@RequestBody User user) {
return userService.save(user);
}
@PutMapping("/{id}")
public Mono<User> updateUser(@PathVariable Long id, @RequestBody User user) {
return userService.findById(id)
.flatMap(existing -> {
existing.setName(user.getName());
existing.setEmail(user.getEmail());
return userService.save(existing);
});
}
@DeleteMapping("/{id}")
public Mono<Void> deleteUser(@PathVariable Long id) {
return userService.deleteById(id);
}
@GetMapping("/search")
public Flux<User> searchUsers(@RequestParam String query) {
return userService.findByNameContaining(query)
.take(10)
.filter(user -> user.isActive());
}
}
package com.example.demo.service;
import com.example.demo.model.User;
import com.example.demo.repository.ReactiveUserRepository;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class ReactiveUserService {
private final ReactiveUserRepository repository;
public ReactiveUserService(ReactiveUserRepository repository) {
this.repository = repository;
}
public Mono<User> findById(Long id) {
return repository.findById(id);
}
public Flux<User> findAll() {
return repository.findAll();
}
public Mono<User> save(User user) {
return repository.save(user);
}
public Mono<Void> deleteById(Long id) {
return repository.deleteById(id);
}
public Flux<User> findByNameContaining(String name) {
return repository.findByNameContainingIgnoreCase(name);
}
public Mono<Long> countActiveUsers() {
return repository.findAll()
.filter(User::isActive)
.count();
}
public Flux<User> getRecentUsers(int limit) {
return repository.findAll()
.sort((u1, u2) -> u2.getCreatedAt().compareTo(u1.getCreatedAt()))
.take(limit);
}
}
Spring WebFlux enables reactive, non-blocking applications using Project Reactor. Mono represents 0-1 elements, Flux represents 0-N elements. Reactive programming handles high concurrency with fewer threads. I use reactive repositories with R2DBC for non-blocking database access. Operators transform streams—map, flatMap, filter, zip. Backpressure prevents overwhelming consumers. WebClient replaces RestTemplate for reactive HTTP calls. Reactive applications excel in I/O-bound scenarios—microservices, streaming, real-time data. Error handling uses onErrorResume, retry, timeout. Schedulers control execution context. WebFlux integrates with reactive security, actuator, and messaging. The programming model differs from imperative code—thinking in streams and transformations. Properly designed reactive systems achieve higher throughput and resilience.