add Rate Limiter & fix
This commit is contained in:
@@ -18,10 +18,7 @@ import io.vertx.ext.web.sstore.redis.RedisSessionStore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import su.xserver.iikocon.config.AppConfig;
|
||||
import su.xserver.iikocon.handler.AdminHandler;
|
||||
import su.xserver.iikocon.handler.AuthHandler;
|
||||
import su.xserver.iikocon.handler.SecurityHandler;
|
||||
import su.xserver.iikocon.handler.SetupHandler;
|
||||
import su.xserver.iikocon.handler.*;
|
||||
import su.xserver.iikocon.iiko.IikoOlapClient;
|
||||
import su.xserver.iikocon.service.*;
|
||||
|
||||
@@ -63,12 +60,10 @@ public class MainVerticle extends AbstractVerticle {
|
||||
db = new DataBaseService(vertx, config.database);
|
||||
redis = new RedisService(vertx, config.redis);
|
||||
|
||||
// Инициализация сервисов
|
||||
userService = new UserService(db.getPool());
|
||||
restaurantService = new RestaurantService(db.getPool());
|
||||
settingsService = new SettingsService(db.getPool());
|
||||
|
||||
// Инициализация БД (создание таблицы users)
|
||||
userService.initDatabase().onFailure(err -> {
|
||||
log.error("Failed to initialize database", err);
|
||||
startPromise.fail(err);
|
||||
@@ -125,7 +120,7 @@ public class MainVerticle extends AbstractVerticle {
|
||||
|
||||
SecurityHandler securityHandlers = new SecurityHandler(settingsService);
|
||||
|
||||
// Обработчики безопасности (порядок важен)
|
||||
// Обработчики безопасности
|
||||
router.route().handler(securityHandlers.hostValidator());
|
||||
router.route().handler(securityHandlers.proxyHeadersHandler());
|
||||
router.route().handler(securityHandlers.cspHeader());
|
||||
@@ -161,6 +156,13 @@ public class MainVerticle extends AbstractVerticle {
|
||||
}
|
||||
});
|
||||
|
||||
// Rate Limiter Handler
|
||||
RedisRateLimiter limiter = new RedisRateLimiter(
|
||||
redis.getRedis(), 60, 60_000
|
||||
);
|
||||
|
||||
router.route().handler(limiter);
|
||||
|
||||
// Health Checks
|
||||
HealthCheckService healthCheckService = new HealthCheckService(vertx, redis, db);
|
||||
healthCheckService.registerHealthCheck(router);
|
||||
@@ -182,7 +184,6 @@ public class MainVerticle extends AbstractVerticle {
|
||||
rc.response().setStatusCode(403).end(new JsonObject().put("error", "Registration is disabled").encode());
|
||||
return;
|
||||
}
|
||||
// существующий код регистрации
|
||||
JsonObject body = rc.body().asJsonObject();
|
||||
String login = body.getString("login");
|
||||
String email = body.getString("email");
|
||||
@@ -197,7 +198,6 @@ public class MainVerticle extends AbstractVerticle {
|
||||
.onFailure(err -> rc.response().setStatusCode(500).end(err.getMessage()));
|
||||
}));
|
||||
|
||||
// В initRouter после настройки authHandler, до объявления /api/admin/*:
|
||||
router.route("/api/profile").handler(authHandler::requireAuth);
|
||||
router.get("/api/profile").handler(rc -> {
|
||||
Integer userId = rc.session().get("userId");
|
||||
@@ -218,29 +218,8 @@ public class MainVerticle extends AbstractVerticle {
|
||||
})
|
||||
.onFailure(err -> rc.response().setStatusCode(500).end(err.getMessage()));
|
||||
});
|
||||
router.put("/api/admin/language").handler(rc -> {
|
||||
Integer userId = rc.session().get("userId");
|
||||
JsonObject body = rc.body().asJsonObject();
|
||||
String language = body.getString("language");
|
||||
if (language == null || (!"en".equals(language) && !"ru".equals(language))) {
|
||||
rc.response().setStatusCode(400).end("Invalid language");
|
||||
return;
|
||||
}
|
||||
userService.updateLanguage(userId, language)
|
||||
.onSuccess(v -> {
|
||||
rc.session().put("language", language);
|
||||
rc.response().end(new JsonObject().put("success", true).encode());
|
||||
})
|
||||
.onFailure(err -> rc.response().setStatusCode(500).end(err.getMessage()));
|
||||
});
|
||||
|
||||
// Затем существующий блок router.route("/api/admin/*").handler(authHandler::requireAuth);
|
||||
router.route("/api/admin/*").handler(authHandler::requireAuth);
|
||||
// Добавить проверку роли для чувствительных эндпоинтов:
|
||||
// router.route("/api/settings/meta*").handler(AdminHandler::requireAdmin);
|
||||
// router.route("/api/admin/settings*").handler(AdminHandler::requireAdmin);
|
||||
// router.route("/api/admin/active-sessions").handler(AdminHandler::requireAdmin);
|
||||
|
||||
router.get("/api/admin/users").handler(rc -> userService.getAllUsers().onComplete(ar -> {
|
||||
if (ar.succeeded()) {
|
||||
rc.response()
|
||||
@@ -251,6 +230,7 @@ public class MainVerticle extends AbstractVerticle {
|
||||
}
|
||||
}));
|
||||
|
||||
router.route("/api/admin/users*").handler(AdminHandler::requireAdmin);
|
||||
router.post("/api/admin/users").handler(rc -> {
|
||||
JsonObject body = rc.body().asJsonObject();
|
||||
String login = body.getString("login");
|
||||
@@ -394,14 +374,12 @@ public class MainVerticle extends AbstractVerticle {
|
||||
.onFailure(err -> rc.response().setStatusCode(500).end(err.getMessage()));
|
||||
});
|
||||
|
||||
// Получение всех настроек
|
||||
router.get("/api/settings").handler(rc -> {
|
||||
settingsService.getPublicSettings()
|
||||
.onSuccess(settings -> rc.response().putHeader("Content-Type", "application/json").end(settings.encode()))
|
||||
.onFailure(err -> rc.response().setStatusCode(500).end());
|
||||
});
|
||||
|
||||
// Получить метаданные всех настроек (для построения формы)
|
||||
router.route("/api/admin/settings*").handler(AdminHandler::requireAdmin);
|
||||
router.get("/api/admin/settings/meta").handler(rc -> {
|
||||
settingsService.getMetadata()
|
||||
@@ -409,14 +387,12 @@ public class MainVerticle extends AbstractVerticle {
|
||||
.onFailure(err -> rc.response().setStatusCode(500).end(err.getMessage()));
|
||||
});
|
||||
|
||||
// Получить все настройки со значениями по умолчанию
|
||||
router.get("/api/admin/settings").handler(rc -> {
|
||||
settingsService.getAllWithDefaults()
|
||||
.onSuccess(settings -> rc.response().putHeader("Content-Type", "application/json").end(settings.encode()))
|
||||
.onFailure(err -> rc.response().setStatusCode(500).end(err.getMessage()));
|
||||
});
|
||||
|
||||
// Обновление настроек (админ)
|
||||
router.put("/api/admin/settings").handler(rc -> {
|
||||
JsonObject body = rc.body().asJsonObject();
|
||||
List<Future<Void>> futures = new ArrayList<>(); // явно указываем тип Future<Void>
|
||||
|
||||
185
src/main/java/su/xserver/iikocon/handler/RedisRateLimiter.java
Normal file
185
src/main/java/su/xserver/iikocon/handler/RedisRateLimiter.java
Normal file
@@ -0,0 +1,185 @@
|
||||
package su.xserver.iikocon.handler;
|
||||
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.redis.client.Command;
|
||||
import io.vertx.redis.client.Redis;
|
||||
import io.vertx.redis.client.Request;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class RedisRateLimiter implements Handler<RoutingContext> {
|
||||
|
||||
private final Logger logger;
|
||||
private final Redis redis;
|
||||
private final int limitPerWindow;
|
||||
private final long windowMillis;
|
||||
private static final String PREFIX = "ip:limit:";
|
||||
|
||||
// Основной кэш: clientKey -> время окончания блокировки
|
||||
private final ConcurrentHashMap<String, Long> blockedClients = new ConcurrentHashMap<>();
|
||||
// Индекс по времени: время окончания -> множество клиентов
|
||||
private final ConcurrentSkipListMap<Long, Set<String>> expiryIndex = new ConcurrentSkipListMap<>();
|
||||
private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
private final AtomicLong allowedRequests = new AtomicLong(0);
|
||||
private final AtomicLong blockedRequests = new AtomicLong(0);
|
||||
private final AtomicLong redisCalls = new AtomicLong(0);
|
||||
private final AtomicLong redisFailures = new AtomicLong(0);
|
||||
private final AtomicLong totalRedisLatency = new AtomicLong(0);
|
||||
private final AtomicLong redisLatencyCount = new AtomicLong(0);
|
||||
|
||||
// Частота блокировок по IP
|
||||
private final ConcurrentHashMap<String, AtomicLong> blockedByClient = new ConcurrentHashMap<>();
|
||||
|
||||
public RedisRateLimiter(Redis redis, int limitPerWindow, long windowMillis) {
|
||||
this.logger = LoggerFactory.getLogger("[RedisRateLimiter]");
|
||||
this.redis = redis;
|
||||
this.limitPerWindow = limitPerWindow;
|
||||
this.windowMillis = windowMillis;
|
||||
|
||||
// Периодическая очистка только истёкших блокировок
|
||||
cleaner.scheduleAtFixedRate(this::cleanupExpiredClients, windowMillis, windowMillis / 2, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(RoutingContext context) {
|
||||
String clientKey = getClientKey(context);
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
// Проверяем локальную блокировку
|
||||
Long blockedUntil = blockedClients.get(clientKey);
|
||||
if (blockedUntil != null) {
|
||||
if (blockedUntil > now) {
|
||||
blockedRequests.incrementAndGet();
|
||||
incrementBlockCount(clientKey);
|
||||
sendTooManyRequests(context);
|
||||
return;
|
||||
} else {
|
||||
unblockClient(clientKey, blockedUntil);
|
||||
}
|
||||
}
|
||||
|
||||
String redisKey = PREFIX + clientKey;
|
||||
checkRateLimit(context, redisKey, clientKey);
|
||||
}
|
||||
|
||||
private void checkRateLimit(RoutingContext context, String redisKey, String clientKey) {
|
||||
String luaScript = """
|
||||
local key = KEYS[1]
|
||||
local limit = tonumber(ARGV[1])
|
||||
local ttl = tonumber(ARGV[2])
|
||||
|
||||
local current = redis.call('INCR', key)
|
||||
if current == 1 then
|
||||
redis.call('PEXPIRE', key, ttl)
|
||||
end
|
||||
|
||||
if current > limit then
|
||||
return 'TOO_MANY_REQUESTS'
|
||||
else
|
||||
return 'OK'
|
||||
end
|
||||
""";
|
||||
|
||||
redisCalls.incrementAndGet();
|
||||
long start = System.nanoTime();
|
||||
|
||||
Request request = Request.cmd(Command.EVAL)
|
||||
.arg(luaScript)
|
||||
.arg(1)
|
||||
.arg(redisKey)
|
||||
.arg(limitPerWindow)
|
||||
.arg(windowMillis);
|
||||
|
||||
redis.send(request)
|
||||
.onSuccess(response -> {
|
||||
long duration = System.nanoTime() - start;
|
||||
redisLatencyCount.incrementAndGet();
|
||||
totalRedisLatency.addAndGet(TimeUnit.NANOSECONDS.toMillis(duration));
|
||||
|
||||
String result = response.toString();
|
||||
if ("TOO_MANY_REQUESTS".equals(result)) {
|
||||
blockClient(clientKey);
|
||||
blockedRequests.incrementAndGet();
|
||||
incrementBlockCount(clientKey);
|
||||
sendTooManyRequests(context);
|
||||
} else {
|
||||
allowedRequests.incrementAndGet();
|
||||
context.next();
|
||||
}
|
||||
}).onFailure(error -> {
|
||||
redisFailures.incrementAndGet();
|
||||
context.response()
|
||||
.setStatusCode(503)
|
||||
.putHeader("Content-Type", "application/json")
|
||||
.end(new JsonObject()
|
||||
.put("error", "503 Service Unavailable")
|
||||
.put("message", "Redis is not connected")
|
||||
.encodePrettily()
|
||||
);
|
||||
|
||||
logger.error(error.getMessage());
|
||||
});
|
||||
}
|
||||
|
||||
private void blockClient(String clientKey) {
|
||||
long blockedUntil = System.currentTimeMillis() + windowMillis;
|
||||
blockedClients.put(clientKey, blockedUntil);
|
||||
expiryIndex.computeIfAbsent(blockedUntil, t -> ConcurrentHashMap.newKeySet()).add(clientKey);
|
||||
}
|
||||
|
||||
private void unblockClient(String clientKey, long expiryTime) {
|
||||
blockedClients.remove(clientKey);
|
||||
Set<String> clients = expiryIndex.get(expiryTime);
|
||||
if (clients != null) {
|
||||
clients.remove(clientKey);
|
||||
if (clients.isEmpty()) {
|
||||
expiryIndex.remove(expiryTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void incrementBlockCount(String clientKey) {
|
||||
blockedByClient.computeIfAbsent(clientKey, k -> new AtomicLong(0)).incrementAndGet();
|
||||
}
|
||||
|
||||
private void cleanupExpiredClients() {
|
||||
long now = System.currentTimeMillis();
|
||||
// Получаем все записи, у которых время истечения <= now
|
||||
NavigableMap<Long, Set<String>> expired = expiryIndex.headMap(now, true);
|
||||
|
||||
if (expired.isEmpty()) return;
|
||||
|
||||
for (Map.Entry<Long, Set<String>> entry : expired.entrySet()) {
|
||||
Set<String> clients = entry.getValue();
|
||||
for (String client : clients) {
|
||||
blockedClients.remove(client);
|
||||
}
|
||||
}
|
||||
|
||||
expired.clear(); // очищаем диапазон из индекса
|
||||
}
|
||||
|
||||
private void sendTooManyRequests(RoutingContext context) {
|
||||
context.response()
|
||||
.setStatusCode(429)
|
||||
.putHeader("Content-Type", "application/json")
|
||||
.end(new JsonObject()
|
||||
.put("error", "429 Too Many Requests")
|
||||
.put("message", "Try again later")
|
||||
.encodePrettily()
|
||||
);
|
||||
}
|
||||
|
||||
private String getClientKey(RoutingContext context) {
|
||||
return context.request().remoteAddress().host().replace(':', '.');
|
||||
}
|
||||
}
|
||||
@@ -65,7 +65,6 @@ public class IikoOlapClient {
|
||||
.addQueryParam("key", token)
|
||||
.send()
|
||||
.onSuccess(resp -> {
|
||||
// log.info("Logout completed for token, status {}", resp.statusCode());
|
||||
log.info(resp.bodyAsString());
|
||||
promise.complete();
|
||||
})
|
||||
|
||||
@@ -5,7 +5,6 @@ import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.ext.healthchecks.Status;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.healthchecks.HealthCheckHandler;
|
||||
import su.xserver.iikocon.iiko.IikoOlapClient;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@@ -56,21 +55,7 @@ public class HealthCheckService {
|
||||
.onFailure(err -> future.tryFail("DataBase ping failed: " + err.getMessage()));
|
||||
});
|
||||
|
||||
// healthCheckHandler.register("iiko", future -> {
|
||||
//
|
||||
// IikoOlapClient iiko = new IikoOlapClient(vertx, "folk-amber-co.iiko.it", "4444", "92f2fd99879b0c2466ab8648afb63c49032379c1", true);
|
||||
//
|
||||
// iiko.checkConnection()
|
||||
// .onSuccess(res -> {
|
||||
// JsonObject data = new JsonObject()
|
||||
// .put("name", "iiko")
|
||||
// .put("latency_ms", res.getLong("latency_ms"));
|
||||
// future.complete(Status.OK(data));
|
||||
// })
|
||||
// .onFailure(err -> future.tryFail("iiko ping failed: " + err.getMessage()));
|
||||
// });
|
||||
|
||||
// Регистрируем endpoint /api/health
|
||||
// Endpoint /api/health
|
||||
router.get("/api/health").handler(healthCheckHandler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,7 +176,6 @@ public class UserService {
|
||||
}
|
||||
|
||||
if (setClauses.isEmpty()) {
|
||||
// Ни одно поле не обновляется — возвращаем успешный Future
|
||||
return Future.succeededFuture();
|
||||
}
|
||||
|
||||
@@ -184,12 +183,6 @@ public class UserService {
|
||||
return SqlTemplate.forUpdate(pool, sql).execute(params).mapEmpty();
|
||||
}
|
||||
|
||||
public Future<Void> updateLanguage(int userId, String language) {
|
||||
return SqlTemplate.forUpdate(pool, "UPDATE users SET language = #{lang} WHERE id = #{id}")
|
||||
.execute(Map.of("id", userId, "lang", language))
|
||||
.mapEmpty();
|
||||
}
|
||||
|
||||
public boolean checkPassword(String plain, String hash) {
|
||||
try {
|
||||
return BCrypt.checkpw(plain, hash);
|
||||
|
||||
Reference in New Issue
Block a user