Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在 asp.net core 中使用 UnitOfWorkManager 实现多种事务传播 #289

Closed
2881099 opened this issue Apr 23, 2020 · 17 comments
Closed

在 asp.net core 中使用 UnitOfWorkManager 实现多种事务传播 #289

2881099 opened this issue Apr 23, 2020 · 17 comments
Labels
docs This is a document good first issue Good for newcomers

Comments

@2881099
Copy link
Collaborator

2881099 commented Apr 23, 2020

本篇文章内容引导,如何在 asp.net core 项目中使用特性(注解) 的方式管理事务。

UnitOfWorkManager 只可以管理 Repository 仓储对象的事务,直接 fsql.Insert<T>() 是不行的!!但是可以用 repository.Orm.Insert<T>!!repository.Orm 是特殊实现的 IFreeSql,与 当前事务保持一致。

支持六种传播方式(propagation),意味着跨方法的事务非常方便,并且支持同步异步:

  • Requierd:如果当前没有事务,就新建一个事务,如果已存在一个事务中,加入到这个事务中,默认的选择。
  • Supports:支持当前事务,如果没有当前事务,就以非事务方法执行。
  • Mandatory:使用当前事务,如果没有当前事务,就抛出异常。
  • NotSupported:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
  • Never:以非事务方式执行操作,如果当前事务存在则抛出异常。
  • Nested:以嵌套事务方式执行。

第一步:引入动态代理库

肉夹馍:https://github.com/inversionhourglass/Rougamo

dotnet add package Rougamo.Fody

[AttributeUsage(AttributeTargets.Method)]
public class TransactionalAttribute : Rougamo.MoAttribute
{
    public Propagation Propagation { get; set; } = Propagation.Required;
    public IsolationLevel IsolationLevel { get => m_IsolationLevel.Value; set => m_IsolationLevel = value; }
    IsolationLevel? m_IsolationLevel;

    static AsyncLocal<IServiceProvider> m_ServiceProvider = new AsyncLocal<IServiceProvider>();
    public static void SetServiceProvider(IServiceProvider serviceProvider) => m_ServiceProvider.Value = serviceProvider;

    IUnitOfWork _uow;
    public override void OnEntry(MethodContext context)
    {
        var uowManager = m_ServiceProvider.Value.GetService(typeof(UnitOfWorkManager)) as UnitOfWorkManager;
        _uow = uowManager.Begin(this.Propagation, this.m_IsolationLevel);
    }
    public override void OnExit(MethodContext context)
    {
        if (typeof(Task).IsAssignableFrom(context.RealReturnType))
            ((Task)context.ReturnValue).ContinueWith(t => _OnExit());
        else _OnExit();

        void _OnExit()
        {
            try
            {
                if (context.Exception == null) _uow.Commit();
                else _uow.Rollback();
            }
            finally
            {
                _uow.Dispose();
            }
        }
    }
}
UnitOfWorkManager 成员 说明
IUnitOfWork Current 返回当前的工作单元
void Binding(repository) 将仓储的事务交给它管理
IUnitOfWork Begin(propagation, isolationLevel) 创建工作单元

第二步:配置 Startup.cs 注入、中间件

//Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IFreeSql>(fsql);
    services.AddScoped<UnitOfWorkManager>();
    services.AddFreeRepository(null, typeof(Startup).Assembly);
   //批量注入 Service
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    app.Use(async (context, next) =>
    {
        TransactionalAttribute.SetServiceProvider(context.RequestServices);
        await next();
    });
}

第三步:在 Controller 或者 Service 或者 Repository 中使用事务特性

public class SongService
{
    readonly IBaseRepository<Song> _repoSong;
    readonly IBaseRepository<Detail> _repoDetail;
    readonly SongRepository _repoSong2;

    public SongService(IBaseRepository<Song> repoSong, IBaseRepository<Detail> repoDetail, SongRepository repoSong2)
    {
        _repoSong = repoSong;
        _repoDetail = repoDetail;
        _repoSong2 = repoSong2;
    }

    [Transactional]
    public virtual void Test1()
    {
        //这里 _repoSong、_repoDetail、_repoSong2 所有操作都是一个工作单元
        this.Test2();
    }

    [Transactional(Propagation = Propagation.Nested)]
    public virtual void Test2() //嵌套事务,新的(不使用 Test1 的事务)
    {
        //这里 _repoSong、_repoDetail、_repoSong2 所有操作都是一个工作单元
    }
}

是不是进方法就开事务呢?

不一定是真实事务,有可能是虚的,就是一个假的 unitofwork(不带事务)

也有可能是延用上一次的事务

也有可能是新开事务,具体要看传播模式

重写仓储实现

以上使用的是泛型仓储,那我们如果是重写一个仓储 如何保持和UnitOfWorkManager同一个事务呢。
继承现有的DefaultRepository<,>仓储,实现自定义的仓储SongRepository.cs,

public class SongRepository : DefaultRepository<Song, int>, ISongRepository
{
    public SongRepository(UnitOfWorkManager uowm) : base(uowm?.Orm, uowm)
    {
    }
    public List<Song> GetSongs()
    {
        return Select.Page(1, 10).ToList();
    }
}

其中接口。ISongRepository.cs

public interface ISongRepository : IBaseRepository<Song, int>
{
    List<Song> GetSongs();
}

在 startup.cs 注入此服务

services.AddScoped<ISongRepository, SongRepository>();
@2881099 2881099 changed the title 在 asp.net core 中使用 UnitOfWorkManager 多种事务传播方式 在 asp.net core 中使用 UnitOfWorkManager 实现多种事务传播 Apr 23, 2020
@2881099 2881099 added the good first issue Good for newcomers label Apr 23, 2020
@yus1977
Copy link

yus1977 commented Jul 6, 2020

事务传播使用过程中发现一个BUG
如下面的代码

public void FunMain(){
     //代码1:
     //事务处理方法
      FuncA();

      //代码2:
      //读取、前面FuncA中添加、更新的数据
       .......
}

[Transaction]
public virtual  void FunA(){
      //代码2.1
      //查询数据,或不访问数据库
      .........

      //代码2.2
      //添加、更新数据到数据库
      FuncA_1();
}

[Transaction]
public virtual  void FuncA_1(){
      //代码2.2.1
      //添加、更新数据到数据库
      .......
}

执行完“代码1”后,“代码2”访问不到前面的事务中提交的数据,只有当方法FunMain全部结束后,数据才真正的提交到数据库。
我分析原因可能如下:
1、方法FunA和FuncA_1都标记了事务
2、首先进入FunA,UnitOfWorkManager标记启动事务,
3、“代码2.1”到“代码2.2”之间并没有对数据库执行“增、删、改”操作,所以并没有真正启动事务
4、然后“代码2.2”中,FunA中调用了FuncA_1,FuncA_1中传递了工作单元,UnitOfWorkManager标记启动事务
5、FuncA_1中对数据库执行了“增、删、改”操作,真正启动了事务
6、最后,FuncA_1执行完成提交事务,FunA执行完成提交事务

在第3步并没有真正启动事务,可能导致了Bug,执行完第6步后并没有真正把数据提交到数据库。
如果在第3步中,增加修改数据库的代码,那么第3步时就会真正启动事务,执行完第6步后,数据库里就有提交的数据。

@yus1977
Copy link

yus1977 commented Jul 6, 2020

另外请问,如果启用了主从库。
当启用事务时,会自动读取主库吗?
如果类似上面第3步的情况,会自动读取主库吗?

@yus1977
Copy link

yus1977 commented Jul 6, 2020

我确认了一下,开启事务或使用工作单元标记事务后,都是默认读取主库

@2881099
Copy link
Collaborator Author

2881099 commented Jul 6, 2020

有没有demo,我试试

@yus1977
Copy link

yus1977 commented Jul 6, 2020

我是直接在现有的项目中测试的,Demo一下子剥离不出来

@2881099
Copy link
Collaborator Author

2881099 commented Jul 6, 2020

TransactionAttribute 的 动态代理,有执行 OnBefore(Begin) OnAfter(Commit) 吗

@2881099
Copy link
Collaborator Author

2881099 commented Jul 6, 2020

fsql.Aop.TraceBefore
fsql.Aop.TraceAfter

这两个事件监视起来

@yus1977
Copy link

yus1977 commented Jul 6, 2020

OK,我试试 fsql.Aop.TraceBefore, fsql.Aop.TraceAfter

@yus1977
Copy link

yus1977 commented Jul 6, 2020

上面的那段代码,
fsql.Aop.TraceBefore:执行“代码1“时,启动了一次事务
fsql.Aop.TraceAfter:执行完”代码1“时,这个跟踪没有被触发,事务没有成功提交

@yus1977
Copy link

yus1977 commented Jul 6, 2020

经过测试,类似上面的代码,在同一个类中
在标记事务的方法内,调用另一个标记事务方法时。
跟踪结果是:
fsql.Aop.TraceBefore:事务在第一次时启动了
fsql.Aop.TraceAfter:方法执行结束后,事务并没有真正提交。只有当前线程执行结束后,才会真正提交事务。

与我猜测的,第3步是否更新数据库没有关系。

@2881099
Copy link
Collaborator Author

2881099 commented Jul 6, 2020

其实可以换个测试方式

var uowm = new UnitOfWorkManager(fsql);
var repo = new DefultRepository<t>(fsql, uowm);

using (var uow1 = uow.Begin())
{

using (var uow2 = uow.Begin())
{
repo.Insert(...);
uow2.Commit();
}
repo.Where(...).ToList(); //是否变化
uow1.Commit();
}

像这样整理一个 demo 传上来

@2881099
Copy link
Collaborator Author

2881099 commented Jul 6, 2020

@yus1977 这样测试可以吗?

@yus1977
Copy link

yus1977 commented Jul 6, 2020

@yus1977 这样测试可以吗?

OK,我找时间测试一下

@yus1977
Copy link

yus1977 commented Jul 7, 2020

经过测试,上面的代码没有问题,UnitOfWorkManager本身没有问题。
是我这边的问题,写的AOP有BUG。
我用的是Autofac,所以照着前面的TransactionalAttribute 重新写了AOP,TransactionInterceptor。
我的事务在Service对象上,当同一个对象多个标记了事务的方法嵌套时,抄过来的代码就会出问题

原因如下:
因为一个Servier实例通过AOP编织后,绑定的TransactionInterceptor是同一个实例。
如下面的代码中

IUnitOfWork _uow;
........
Task OnBefore(UnitOfWorkManager uowm)
    {
        _uow = uowm.Begin(this.Propagation, this.IsolationLevel);
        return Task.FromResult(false);
    }

同一个实例多次启用事务后,_uow会被后面启用的事务覆盖。
而第二次以后启动的事务创建的的工作单元是UnitOfWorkVirtual。
因此不能简单的使用变量来存储创建的IUnitOfWork

@yus1977
Copy link

yus1977 commented Jul 7, 2020

下面是完整的代码
AOP使用是的是 Autofac +Castle.DynamicProxy + Castle.Core.AsyncInterceptor

    /// <summary>
    /// 事务拦截器TranAOP, 支持同步和异步方法
    /// </summary>
    public class TransactionInterceptor : AsyncInterceptorBase
    {
        public TransactionInterceptor(UnitOfWorkManager uowManager, IConfiguration configuration)
        {
            _uowManager = uowManager;

            this.configuration = configuration;
            transactionMonitor = configuration["ContextAutoSetup:TransactionMonitor"].ToBool();
        }

        private readonly UnitOfWorkManager _uowManager;
        private readonly IConfiguration configuration;
        bool transactionMonitor = false;

        /// <summary>
        /// 无返回值的 异步/同步 方法拦截
        /// </summary>
        /// <param name="invocation"></param>
        /// <param name="proceed"></param>
        /// <returns></returns>
        protected override async Task InterceptAsync(IInvocation invocation, Func<IInvocation, Task> proceed)
        {
            var methodInfo = invocation.MethodInvocationTarget ?? invocation.Method;
            //对当前方法的特性验证
            if (methodInfo.HasAttribute<TransactionAttribute>())
            {
                using (var unitOfWork = BeginTransaction(methodInfo))
                {
                    try
                    {
                        await proceed(invocation).ConfigureAwait(false);

                        CommitTransaction(unitOfWork);
                    }
                    catch (Exception ex)
                    {
                        RollbackTransaction(unitOfWork);
                        throw ex;
                    }
                }
            }
            else
                await proceed(invocation).ConfigureAwait(false);
        }

        /// <summary>
        /// 有返回值的 异步/同步 方法拦截
        /// </summary>
        /// <param name="invocation"></param>
        /// <param name="proceed"></param>
        /// <returns></returns>
        protected override async Task<TResult> InterceptAsync<TResult>(IInvocation invocation, Func<IInvocation, Task<TResult>> proceed)
        {
            var methodInfo = invocation.MethodInvocationTarget ?? invocation.Method;
            //对当前方法的特性验证
            if (methodInfo.HasAttribute<TransactionAttribute>())
            {
                using (var unitOfWork = BeginTransaction(methodInfo))
                {
                    try
                    {
                        var result = await proceed(invocation).ConfigureAwait(false);

                        CommitTransaction(unitOfWork);

                        return result;
                    }
                    catch (Exception ex)
                    {
                        RollbackTransaction(unitOfWork);
                        throw ex;
                    }
                }
            }
            else
                return await proceed(invocation).ConfigureAwait(false);
        }

        /// <summary>
        /// 开始事务
        /// </summary>
        /// <param name="methodInfo"></param>
        private IUnitOfWork BeginTransaction(MethodInfo methodInfo)
        {
            WriteConsole(0);

            TransactionAttribute attribute = methodInfo.GetCustomAttribute<TransactionAttribute>();
            //启动事务
            return _uowManager.Begin(attribute.Propagation, attribute.IsolationLevel);
        }

        /// <summary>
        /// //提交事务
        /// </summary>
        private void CommitTransaction(IUnitOfWork unitOfWork)
        {
            unitOfWork.Commit();

            WriteConsole(1);
        }

        /// <summary>
        /// 回滚事务
        /// </summary>
        private void RollbackTransaction(IUnitOfWork unitOfWork)
        {
            unitOfWork.Rollback();

            WriteConsole(2);
        }

        /// <summary>
        /// 控制台输出信息
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="type">0开始事务,1提交事务,2回滚事务</param>
        private void WriteConsole(int type)
        {
            if (transactionMonitor)
            {
                string msg = "";
                switch (type)
                {
                    case 0:
                        msg = $"***Transaction Begin***\n";
                        break;
                    case 1:
                        msg = $"***Transactio Commit***\n";
                        break;
                    case 2:
                        msg = $"***Transaction* Rollback**\n";
                        break;
                }

                ConsoleHelper.WriteSuccessLine(msg);
            }
        }
    }
}

@dotnetcore dotnetcore deleted a comment from yus1977 Jul 7, 2020
@2881099 2881099 closed this as completed Nov 24, 2020
@luoyunchong luoyunchong added the docs This is a document label Nov 24, 2020
@2881099
Copy link
Collaborator Author

2881099 commented Jan 7, 2021

IdleBus + UnitOfWorkManager

默认 UnitOfWorkManager 只能绑定一个 FreeSql 使用,所以 Scoped 生命范围内不切换的情况下,可以实现 IdleBus + UnitOfWorkManager 以来实现。

第一步:定义 IdleBus 扩展方法

public static class IdleBusExtesions
{
    static AsyncLocal<string> AsyncLocalTenantId = new AsyncLocal<string>();
    public static IdleBus<IFreeSql> ChangeTenant(this IdleBus<IFreeSql> ib, string tenantId)
    {
        AsyncLocalTenantId.Value = tenantId;
        return ib;
    }
    public static IFreeSql Get(this IdleBus<IFreeSql> ib) => ib.Get(AsyncLocalTenantId.Value ?? "default");
    public static IBaseRepository<T> GetRepository<T>(this IdleBus<IFreeSql> ib) where T : class => ib.Get().GetRepository<T>();
}

第二步:定义 IFreeSql 代理类实现

class FreeSqlProxy : IFreeSql
{
    readonly IFreeSql _orm;

    public FreeSqlProxy(IFreeSql fsql)
    {
        _orm = fsql;
    }

    public IAdo Ado => _orm.Ado;
    public IAop Aop => _orm.Aop;
    public ICodeFirst CodeFirst => _orm.CodeFirst;
    public IDbFirst DbFirst => _orm.DbFirst;
    public GlobalFilter GlobalFilter => _orm.GlobalFilter;
    //关键在此处,释放无任何操作
    public void Dispose() { }

    public void Transaction(Action handler) => _orm.Transaction(handler);
    public void Transaction(IsolationLevel isolationLevel, Action handler) => _orm.Transaction(isolationLevel, handler);

    public ISelect<T1> Select<T1>() where T1 : class
    {
        return _orm.Select<T1>();
    }
    public ISelect<T1> Select<T1>(object dywhere) where T1 : class => Select<T1>().WhereDynamic(dywhere);

    public IDelete<T1> Delete<T1>() where T1 : class
    {
        return _orm.Delete<T1>();
    }
    public IDelete<T1> Delete<T1>(object dywhere) where T1 : class => Delete<T1>().WhereDynamic(dywhere);

    public IUpdate<T1> Update<T1>() where T1 : class
    {
        return _orm.Update<T1>();
    }
    public IUpdate<T1> Update<T1>(object dywhere) where T1 : class => Update<T1>().WhereDynamic(dywhere);

    public IInsert<T1> Insert<T1>() where T1 : class
    {
        return _orm.Insert<T1>();
    }
    public IInsert<T1> Insert<T1>(T1 source) where T1 : class => Insert<T1>().AppendData(source);
    public IInsert<T1> Insert<T1>(T1[] source) where T1 : class => Insert<T1>().AppendData(source);
    public IInsert<T1> Insert<T1>(List<T1> source) where T1 : class => Insert<T1>().AppendData(source);
    public IInsert<T1> Insert<T1>(IEnumerable<T1> source) where T1 : class => Insert<T1>().AppendData(source);

    public IInsertOrUpdate<T1> InsertOrUpdate<T1>() where T1 : class
    {
        return _orm.InsertOrUpdate<T1>();
    }
}

第三步:改造 Startup.cs 注入

public static IdleBus<IFreeSql> ib = new IdleBus<IFreeSql>();

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllersWithViews();

    //ib.Register(..);
    //注意此时注入 AddScoped IFreeSql
    services.AddScoped<IFreeSql>(serviceProvider => new FreeSqlProxy(ib.Get()));
    services.AddScoped<UnitOfWorkManager>();
    services.AddFreeRepository(null, typeof(Startup).Assembly);
}

第四步:使用方法请参照 一楼

@dotnetcore dotnetcore deleted a comment from yus1977 Aug 19, 2022
@km93522719
Copy link

km93522719 commented Sep 6, 2022

.net 6.0,按照上面的式例,在方法添加Transactional特性,但是没有生效,不是进入事务里,这个是什么原因引起吗?
Service相关代码:
[Transactional]
private async Task SaveFlowInfoAsync(FlowInfoAddInput flowInfoInput, CancellationToken cancellationToken)
{
//using IUnitOfWork unitOfWork = _unitOfWorkManager.Begin(isolationLevel: IsolationLevel.Snapshot);
//try
//{
await this._flowDetailService.AddAsync(flowInfoInput, cancellationToken);
//其他业务逻辑
。。。

//    unitOfWork.Commit();
//}
//catch (Exception e)
//{
//    unitOfWork.Rollback();
//    LogUtils.LogErr(this, $"流程提交异常:{e.Message}。PARAMS:{flowInfoInput.FlowSubmitInput.ToJson()}", e);
//    throw new CustomException(ExceptionConst.FlowSubmitFail);
//}

}

programe.cs代码:
var app = builder.Build();

//使用Correlation中间件
app.UseCorrelationId();
if (app.Environment.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
//使用自定义的异常中间件
app.UseErrorHandling();
app.UseStaticFiles();
app.UseLocalLogViewer();
app.Use(async (context, next) =>
{
TransactionalAttribute.SetServiceProvider(context.RequestServices);
await next();
});
//使用路由中间件
app.UseRouting()
//使用终端中间件
.UseEndpoints(endpoints =>
{
endpoints.MapGet("/", async context =>
{
await context.Response.WriteAsync("Hello World!");
});
endpoints.MapControllers();
});

app.Run();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs This is a document good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

4 participants