提交 ff6488b8 编写于 作者: B Boris Djurdjevic

Added IDataReader in config for streaming

上级 348659a7
......@@ -11,6 +11,7 @@
<ItemGroup>
<PackageReference Include="DelegateDecompiler.EntityFrameworkCore5" Version="0.32.0" />
<PackageReference Include="EntityFrameworkCore.SqlServer.HierarchyId" Version="4.0.0" />
<PackageReference Include="FastMember" Version="1.5.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite.NetTopologySuite" Version="7.0.8" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="7.0.8" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer.NetTopologySuite" Version="7.0.8" />
......
using EFCore.BulkExtensions.SqlAdapters;
using FastMember;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using NetTopologySuite.Geometries;
......@@ -1435,4 +1436,24 @@ public class EFCoreBulkTestAtypical
};
//context.BulkRead(list2, bulkConfig); // Throws: 'The required column 'xmin' was not present in the results of a 'FromSql' operation.'
}
[Theory]
[InlineData(SqlType.SqlServer)]
private void DataReaderTest(SqlType sqlType)
{
ContextUtil.DatabaseType = sqlType;
using var context = new TestContext(ContextUtil.GetOptions());
context.Truncate<Customer>();
var entities = new List<Customer>
{
new Customer { Name = "Cust 1" },
new Customer { Name = "Cust 2" },
};
using (var reader = ObjectReader.Create(entities))
context.BulkInsert(new List<Customer>(), new BulkConfig { DataReader = reader }); // , EnableStreaming = true
}
}
using Microsoft.Data.SqlClient;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq.Expressions;
......@@ -54,11 +55,6 @@ public class BulkConfig
/// </summary>
public int? BulkCopyTimeout { get; set; }
/// <summary>
/// Propagated to SqlBulkCopy util object.
/// </summary>
public bool EnableStreaming { get; set; }
/// <summary>
/// When set to <c>true</c> Temp tables are created as #Temporary. More info: <c>https://www.sqlservertutorial.net/sql-server-basics/sql-server-temporary-tables/</c>
/// </summary>
......@@ -96,6 +92,16 @@ public class BulkConfig
/// </summary>
public Dictionary<string, string>? CustomSourceDestinationMappingColumns { get; set; }
/// <summary>
/// When configured data is loaded from this object instead of entity list which should be empty
/// </summary>
public IDataReader? DataReader { get; set; }
/// <summary>
/// Can be used when DataReader ia also configured and when set it is propagated to SqlBulkCopy util object.
/// </summary>
public bool EnableStreaming { get; set; }
/// <summary>
/// Can be set to True if want to have tracking of entities from BulkRead or when SetOutputIdentity is set.
/// </summary>
......
......@@ -22,7 +22,8 @@ internal static class DbContextBulkTransaction
operationType != OperationType.InsertOrUpdateOrDelete &&
operationType != OperationType.Truncate &&
operationType != OperationType.SaveChanges &&
(bulkConfig == null || bulkConfig.CustomSourceTableName == null))
(bulkConfig == null || bulkConfig.CustomSourceTableName == null) &&
(bulkConfig == null || bulkConfig.DataReader == null))
{
return;
}
......
......@@ -56,13 +56,21 @@ public class MySqlAdapter : ISqlOperationsAdapter
SetMySqlBulkCopyConfig(mySqlBulkCopy, tableInfo);
var dataTable = GetDataTable(context, type, entities, mySqlBulkCopy, tableInfo);
IDataReader? dataReader = tableInfo.BulkConfig.DataReader;
if (isAsync)
{
await mySqlBulkCopy.WriteToServerAsync(dataTable, cancellationToken).ConfigureAwait(false);
if(dataReader == null)
await mySqlBulkCopy.WriteToServerAsync(dataTable, cancellationToken).ConfigureAwait(false);
else
await mySqlBulkCopy.WriteToServerAsync(dataReader, cancellationToken).ConfigureAwait(false);
}
else
{
mySqlBulkCopy.WriteToServer(dataTable);
if (dataReader == null)
mySqlBulkCopy.WriteToServer(dataTable);
else
mySqlBulkCopy.WriteToServer(dataReader);
}
}
finally
......
......@@ -6,6 +6,7 @@ using NetTopologySuite.IO;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.IO;
using System.Linq;
using System.Reflection;
......@@ -60,13 +61,21 @@ public class SqlServerAdapter : ISqlOperationsAdapter
try
{
var dataTable = GetDataTable(context, type, entities, sqlBulkCopy, tableInfo);
IDataReader? dataReader = tableInfo.BulkConfig.DataReader;
if (isAsync)
{
await sqlBulkCopy.WriteToServerAsync(dataTable, cancellationToken).ConfigureAwait(false);
if(dataReader == null)
await sqlBulkCopy.WriteToServerAsync(dataTable, cancellationToken).ConfigureAwait(false);
else
await sqlBulkCopy.WriteToServerAsync(dataReader, cancellationToken).ConfigureAwait(false);
}
else
{
sqlBulkCopy.WriteToServer(dataTable);
if (dataReader == null)
sqlBulkCopy.WriteToServer(dataTable);
else
sqlBulkCopy.WriteToServer(dataReader);
}
}
catch (InvalidOperationException ex)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册