fix: eliminate more gateway bottlenecks

This commit is contained in:
Hampus Kraft
2026-02-18 17:27:51 +00:00
parent dee08930f4
commit 261d4bd84c
5 changed files with 262 additions and 23 deletions

View File

@@ -20,10 +20,11 @@
-include_lib("fluxer_gateway/include/timeout_config.hrl").
-export([start_link/0]).
-export([start_link/0, start_or_lookup/1, start_or_lookup/2, lookup/1, lookup/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(GUILD_PID_CACHE, guild_pid_cache).
-define(SHARD_TABLE, guild_manager_shard_table).
-type guild_id() :: integer().
-type shard_map() :: #{pid := pid(), ref := reference()}.
@@ -36,18 +37,45 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec start_or_lookup(guild_id()) -> {ok, pid()} | {error, term()}.
start_or_lookup(GuildId) ->
start_or_lookup(GuildId, ?DEFAULT_GEN_SERVER_TIMEOUT).
-spec start_or_lookup(guild_id(), pos_integer()) -> {ok, pid()} | {error, term()}.
start_or_lookup(GuildId, Timeout) ->
call_shard(GuildId, {start_or_lookup, GuildId}, Timeout).
-spec lookup(guild_id()) -> {ok, pid()} | {error, term()}.
lookup(GuildId) ->
lookup(GuildId, ?DEFAULT_GEN_SERVER_TIMEOUT).
-spec lookup(guild_id(), pos_integer()) -> {ok, pid()} | {error, term()}.
lookup(GuildId, Timeout) ->
case lookup_cached_guild_pid(GuildId) of
{ok, GuildPid} ->
{ok, GuildPid};
not_found ->
call_shard(GuildId, {lookup, GuildId}, Timeout)
end.
-spec init(list()) -> {ok, state()}.
init([]) ->
process_flag(trap_exit, true),
ensure_shard_table(),
ets:new(?GUILD_PID_CACHE, [named_table, public, set, {read_concurrency, true}]),
{ShardCount, _Source} = determine_shard_count(),
ShardMap = start_shards(ShardCount),
{ok, #{shards => ShardMap, shard_count => ShardCount}}.
State = #{shards => ShardMap, shard_count => ShardCount},
sync_shard_table(State),
{ok, State}.
-spec handle_call(term(), gen_server:from(), state()) -> {reply, term(), state()}.
handle_call({start_or_lookup, GuildId}, _From, State) ->
{Reply, NewState} = forward_call(GuildId, {start_or_lookup, GuildId}, State),
{reply, Reply, NewState};
handle_call({lookup, GuildId}, _From, State) ->
{Reply, NewState} = forward_call(GuildId, {lookup, GuildId}, State),
{reply, Reply, NewState};
handle_call({stop_guild, GuildId}, _From, State) ->
{Reply, NewState} = forward_call(GuildId, {stop_guild, GuildId}, State),
{reply, Reply, NewState};
@@ -106,11 +134,13 @@ terminate(_Reason, State) ->
end,
maps:values(Shards)
),
catch ets:delete(?SHARD_TABLE),
catch ets:delete(?GUILD_PID_CACHE),
ok.
-spec code_change(term(), term(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) when is_map(State) ->
sync_shard_table(State),
{ok, State}.
-spec determine_shard_count() -> {pos_integer(), configured | auto}.
@@ -150,6 +180,7 @@ start_shard(Index) ->
case guild_manager_shard:start_link(Index) of
{ok, Pid} ->
Ref = erlang:monitor(process, Pid),
put_shard_pid(Index, Pid),
{ok, #{pid => Pid, ref => Ref}};
Error ->
Error
@@ -161,25 +192,48 @@ restart_shard(Index, State) ->
case start_shard(Index) of
{ok, Shard} ->
Updated = State#{shards => maps:put(Index, Shard, Shards)},
sync_shard_table(Updated),
{Shard, Updated};
{error, _Reason} ->
clear_shard_pid(Index),
DummyPid = spawn(fun() -> ok end),
Dummy = #{pid => DummyPid, ref => make_ref()},
{Dummy, State}
end.
-spec call_shard(guild_id(), term(), pos_integer()) -> term().
call_shard(GuildId, Request, Timeout) ->
case shard_pid_from_table(GuildId) of
{ok, Pid} ->
case catch gen_server:call(Pid, Request, Timeout) of
{'EXIT', {timeout, _}} ->
{error, timeout};
{'EXIT', _} ->
call_via_manager(Request, Timeout);
Reply ->
maybe_cache_guild_pid(GuildId, Request, Reply)
end;
error ->
call_via_manager(Request, Timeout)
end.
-spec call_via_manager(term(), pos_integer()) -> term().
call_via_manager(Request, Timeout) ->
gen_server:call(?MODULE, Request, Timeout + 1000).
-spec forward_call(guild_id(), term(), state()) -> {term(), state()}.
forward_call(GuildId, {start_or_lookup, _} = Request, State) ->
case ets:lookup(?GUILD_PID_CACHE, GuildId) of
[{GuildId, GuildPid}] when is_pid(GuildPid) ->
case erlang:is_process_alive(GuildPid) of
true ->
{{ok, GuildPid}, State};
false ->
ets:delete(?GUILD_PID_CACHE, GuildId),
forward_call_to_shard(GuildId, Request, State)
end;
[] ->
case lookup_cached_guild_pid(GuildId) of
{ok, GuildPid} ->
{{ok, GuildPid}, State};
not_found ->
forward_call_to_shard(GuildId, Request, State)
end;
forward_call(GuildId, {lookup, _} = Request, State) ->
case lookup_cached_guild_pid(GuildId) of
{ok, GuildPid} ->
{{ok, GuildPid}, State};
not_found ->
forward_call_to_shard(GuildId, Request, State)
end;
forward_call(GuildId, Request, State) ->
@@ -200,12 +254,8 @@ forward_call_to_shard(GuildId, Request, State) ->
{_Shard, State2} = restart_shard(Index, State1),
forward_call_to_shard(GuildId, Request, State2)
end;
{ok, GuildPid} = Reply ->
ets:insert(?GUILD_PID_CACHE, {GuildId, GuildPid}),
erlang:monitor(process, GuildPid),
{Reply, State1};
Reply ->
{Reply, State1}
{maybe_cache_guild_pid(GuildId, Request, Reply), State1}
end.
-spec ensure_shard(guild_id(), state()) -> {non_neg_integer(), state()}.
@@ -290,6 +340,99 @@ handle_reload_all(GuildIds, State) ->
group_ids_by_shard(GuildIds, ShardCount) ->
rendezvous_router:group_keys(GuildIds, ShardCount).
-spec ensure_shard_table() -> ok.
ensure_shard_table() ->
case ets:whereis(?SHARD_TABLE) of
undefined ->
_ = ets:new(?SHARD_TABLE, [named_table, public, set, {read_concurrency, true}]),
ok;
_ ->
ok
end.
-spec sync_shard_table(state()) -> ok.
sync_shard_table(State) ->
ensure_shard_table(),
_ = ets:delete_all_objects(?SHARD_TABLE),
ShardCount = maps:get(shard_count, State),
ets:insert(?SHARD_TABLE, {shard_count, ShardCount}),
Shards = maps:get(shards, State),
lists:foreach(
fun({Index, #{pid := Pid}}) ->
put_shard_pid(Index, Pid)
end,
maps:to_list(Shards)
),
ok.
-spec put_shard_pid(non_neg_integer(), pid()) -> ok.
put_shard_pid(Index, Pid) ->
ensure_shard_table(),
ets:insert(?SHARD_TABLE, {{shard_pid, Index}, Pid}),
ok.
-spec clear_shard_pid(non_neg_integer()) -> ok.
clear_shard_pid(Index) ->
try ets:delete(?SHARD_TABLE, {shard_pid, Index}) of
_ ->
ok
catch
error:badarg ->
ok
end.
-spec shard_pid_from_table(guild_id()) -> {ok, pid()} | error.
shard_pid_from_table(GuildId) ->
try
case ets:lookup(?SHARD_TABLE, shard_count) of
[{shard_count, ShardCount}] when is_integer(ShardCount), ShardCount > 0 ->
Index = select_shard(GuildId, ShardCount),
case ets:lookup(?SHARD_TABLE, {shard_pid, Index}) of
[{{shard_pid, Index}, Pid}] when is_pid(Pid) ->
case erlang:is_process_alive(Pid) of
true ->
{ok, Pid};
false ->
error
end;
_ ->
error
end;
_ ->
error
end
catch
error:badarg ->
error
end.
-spec lookup_cached_guild_pid(guild_id()) -> {ok, pid()} | not_found.
lookup_cached_guild_pid(GuildId) ->
case catch ets:lookup(?GUILD_PID_CACHE, GuildId) of
[{GuildId, GuildPid}] when is_pid(GuildPid) ->
case erlang:is_process_alive(GuildPid) of
true ->
{ok, GuildPid};
false ->
ets:delete(?GUILD_PID_CACHE, GuildId),
not_found
end;
_ ->
not_found
end.
-spec maybe_cache_guild_pid(guild_id(), term(), term()) -> term().
maybe_cache_guild_pid(GuildId, {start_or_lookup, GuildId}, {ok, GuildPid} = Reply)
when is_pid(GuildPid)
->
ets:insert(?GUILD_PID_CACHE, {GuildId, GuildPid}),
Reply;
maybe_cache_guild_pid(GuildId, {lookup, GuildId}, {ok, GuildPid} = Reply) when is_pid(GuildPid) ->
ets:insert(?GUILD_PID_CACHE, {GuildId, GuildPid}),
Reply;
maybe_cache_guild_pid(_GuildId, _Request, Reply) ->
Reply.
-spec find_shard_by_ref(reference(), #{non_neg_integer() => shard_map()}) ->
{ok, non_neg_integer()} | not_found.
find_shard_by_ref(Ref, Shards) ->
@@ -422,4 +565,60 @@ cleanup_guild_from_cache_does_not_remove_new_pid_test() ->
catch ets:delete(guild_pid_cache)
end.
start_or_lookup_uses_shard_table_without_manager_test_() ->
{timeout, 10, fun() ->
catch ets:delete(guild_pid_cache),
catch ets:delete(guild_manager_shard_table),
ets:new(guild_pid_cache, [named_table, public, set, {read_concurrency, true}]),
ets:new(guild_manager_shard_table, [named_table, public, set, {read_concurrency, true}]),
GuildId = 101,
GuildPid = spawn(fun() -> timer:sleep(1000) end),
ShardPid = spawn(fun() -> shard_stub_loop(GuildId, GuildPid) end),
ets:insert(guild_manager_shard_table, {shard_count, 1}),
ets:insert(guild_manager_shard_table, {{shard_pid, 0}, ShardPid}),
try
?assertEqual({ok, GuildPid}, start_or_lookup(GuildId))
after
ShardPid ! stop,
catch ets:delete(guild_manager_shard_table),
catch ets:delete(guild_pid_cache)
end
end}.
call_shard_timeout_returns_error_timeout_test_() ->
{timeout, 10, fun() ->
catch ets:delete(guild_pid_cache),
catch ets:delete(guild_manager_shard_table),
ets:new(guild_pid_cache, [named_table, public, set, {read_concurrency, true}]),
ets:new(guild_manager_shard_table, [named_table, public, set, {read_concurrency, true}]),
GuildId = 202,
SlowShardPid = spawn(fun() -> slow_shard_loop() end),
ets:insert(guild_manager_shard_table, {shard_count, 1}),
ets:insert(guild_manager_shard_table, {{shard_pid, 0}, SlowShardPid}),
try
?assertEqual({error, timeout}, call_shard(GuildId, {start_or_lookup, GuildId}, 20))
after
SlowShardPid ! stop,
catch ets:delete(guild_manager_shard_table),
catch ets:delete(guild_pid_cache)
end
end}.
shard_stub_loop(GuildId, GuildPid) ->
receive
stop ->
ok;
{'$gen_call', From, {start_or_lookup, GuildId}} ->
gen_server:reply(From, {ok, GuildPid}),
shard_stub_loop(GuildId, GuildPid);
{'$gen_call', From, {lookup, GuildId}} ->
gen_server:reply(From, {ok, GuildPid}),
shard_stub_loop(GuildId, GuildPid);
{'$gen_call', From, _Request} ->
gen_server:reply(From, {error, unsupported}),
shard_stub_loop(GuildId, GuildPid);
_ ->
shard_stub_loop(GuildId, GuildPid)
end.
-endif.