提交 307c290b 编写于 作者: T tanghai

修复actor消息模型一个大bug

上级 2b3b655b
......@@ -80,12 +80,7 @@ namespace Model
return false;
}
if (message.AMessage is ARequest request)
{
request.RpcId = message.RpcId;
}
return await handler.Handle(session, entity, message.AMessage);
return await handler.Handle(session, entity, message);
}
public override void Dispose()
......
......@@ -198,8 +198,8 @@ namespace Model
{
if (!this.lockDict.ContainsKey(key))
{
Log.Info($"location get key: {key}");
this.locations.TryGetValue(key, out int location);
Log.Info($"location get key: {key} {location}");
return Task.FromResult(location);
}
......
......@@ -2,11 +2,18 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson.Serialization.Attributes;
namespace Model
{
public abstract class ActorTask
{
[BsonIgnore]
public ActorProxy proxy;
[BsonElement]
public AMessage message;
public abstract Task<AResponse> Run();
public abstract void RunFail(int error);
......@@ -17,9 +24,6 @@ namespace Model
/// </summary>
public class ActorMessageTask: ActorTask
{
private readonly ActorProxy proxy;
private readonly AMessage message;
public ActorMessageTask(ActorProxy proxy, AMessage message)
{
this.proxy = proxy;
......@@ -44,9 +48,7 @@ namespace Model
/// <typeparam name="Response"></typeparam>
public class ActorRpcTask<Response> : ActorTask where Response: AResponse
{
private readonly ActorProxy proxy;
private readonly ARequest message;
[BsonIgnore]
public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
public ActorRpcTask(ActorProxy proxy, ARequest message)
......@@ -102,7 +104,7 @@ namespace Model
public int WindowSize = 1;
// 最大窗口
public const int MaxWindowSize = 100;
public const int MaxWindowSize = 1;
private TaskCompletionSource<ActorTask> tcs;
......@@ -131,7 +133,7 @@ namespace Model
private void Remove()
{
this.RunningTasks.Dequeue();
ActorTask task = this.RunningTasks.Dequeue();
this.AllowGet();
}
......@@ -142,10 +144,12 @@ namespace Model
return;
}
var t = this.tcs;
this.tcs = null;
ActorTask task = this.WaitingTasks.Dequeue();
this.RunningTasks.Enqueue(task);
var t = this.tcs;
this.tcs = null;
t.SetResult(task);
}
......@@ -157,7 +161,7 @@ namespace Model
this.RunningTasks.Enqueue(task);
return Task.FromResult(task);
}
this.tcs = new TaskCompletionSource<ActorTask>();
return this.tcs.Task;
}
......@@ -247,6 +251,7 @@ namespace Model
{
try
{
//Log.Debug($"realcall {MongoHelper.ToJson(request)} {this.Address}");
request.Id = this.Id;
Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
Response response = await session.Call<Response>(request, cancellationToken);
......@@ -259,6 +264,16 @@ namespace Model
}
}
public string DebugQueue(Queue<ActorTask> tasks)
{
string s = "";
foreach (ActorTask task in tasks)
{
s += $" {task.message.GetType().Name}";
}
return s;
}
public override void Dispose()
{
if (this.Id == 0)
......
......@@ -7,12 +7,12 @@ namespace Model
{
protected abstract Task<bool> Run(E entity, Message message);
public async Task<bool> Handle(Session session, Entity entity, AMessage msg)
public async Task<bool> Handle(Session session, Entity entity, ActorRequest message)
{
Message message = msg as Message;
if (message == null)
Message msg = message.AMessage as Message;
if (msg == null)
{
Log.Error($"消息类型转换错误: {msg.GetType().FullName} to {typeof (Message).Name}");
Log.Error($"消息类型转换错误: {message.GetType().FullName} to {typeof (Message).Name}");
return false;
}
E e = entity as E;
......@@ -21,7 +21,20 @@ namespace Model
Log.Error($"Actor类型转换错误: {entity.GetType().Name} to {typeof(E).Name}");
return false;
}
return await this.Run(e, message);
await this.Run(e, msg);
// 等回调回来,session可以已经断开了,所以需要判断session id是否为0
if (session.Id == 0)
{
return false;
}
ActorResponse response = new ActorResponse
{
RpcId = message.RpcId
};
session.Reply(response);
return true;
}
public Type GetMessageType()
......@@ -42,11 +55,11 @@ namespace Model
protected abstract Task<bool> Run(E entity, Request message, Action<Response> reply);
public async Task<bool> Handle(Session session, Entity entity, AMessage message)
public async Task<bool> Handle(Session session, Entity entity, ActorRequest message)
{
try
{
Request request = message as Request;
Request request = message.AMessage as Request;
if (request == null)
{
Log.Error($"消息类型转换错误: {message.GetType().FullName} to {typeof (Request).Name}");
......@@ -65,7 +78,7 @@ namespace Model
{
return;
}
response.RpcId = request.RpcId;
response.RpcId = message.RpcId;
session.Reply(response);
});
}
......
......@@ -5,7 +5,7 @@ namespace Model
{
public interface IMActorHandler
{
Task<bool> Handle(Session session, Entity entity, AMessage message);
Task<bool> Handle(Session session, Entity entity, ActorRequest message);
Type GetMessageType();
}
}
\ No newline at end of file
......@@ -39,7 +39,7 @@ namespace Hotfix
Session gateSession = Hotfix.Scene.ModelScene.GetComponent<NetOuterComponent>().Create(r2CLogin.Address);
G2C_LoginGate g2CLoginGate = await gateSession.Call<G2C_LoginGate>(new C2G_LoginGate(r2CLogin.Key));
Log.Debug($"{JsonHelper.ToJson(g2CLoginGate)}");
//Log.Debug($"{JsonHelper.ToJson(g2CLoginGate)}");
Log.Info("登陆gate成功!");
......

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.10
MinimumVisualStudioVersion = 10.0.40219.1
# Visual Studio 2017
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Unity.Plugins", "Unity.Plugins.csproj", "{D1FDB199-0FB7-099D-3771-C6A942E4E326}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Unity", "Unity.csproj", "{CF118143-7E37-744F-BE45-3F55345FEC40}"
......@@ -37,7 +35,4 @@ Global
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {F4CA14D3-C715-499C-A877-DF4AB6A05C1E}
EndGlobalSection
EndGlobal
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册