zhaolei
2020-11-20 921de2254ff5712a44ed8575ee8efe34252f6603
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
using PetaPoco.Core;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
 
namespace PetaPoco
{
    /// <summary>
    /// SQLServer 数据库实现类
    /// </summary>
    internal class SqlServerBatchProvider : BatchProvider
    {
        private static readonly Func<IDatabase, SqlConnection> SqlConnectionResolver = db => (SqlConnection)db.Connection;
        private static readonly Func<IDatabase, SqlTransaction> SqlTransactionResolver = db => (SqlTransaction)db.Transaction;
 
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="db">IDatabase 实例</param>
        public SqlServerBatchProvider(IDatabase db) : base(db)
        {
            SupportBulk = true;
        }
 
        /// <summary>
        /// 获得/设置 条目阈值 默认 512
        /// </summary>
        public int Limit { get; set; } = 512;
 
        /// <summary>
        /// 批次插入业务实体类,根据Limit自动判断是Bulk插入还是Batch插入
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="pocos"></param>
        protected override void InsertBulk<T>(IEnumerable<T> pocos)
        {
            var pd = PocoData.ForType(typeof(T), Database.DefaultMapper);
 
            if (pocos.Count() >= Limit)
            {
                InsertBulkImpl(pd, pocos);
            }
            else
            {
                InsertBatch(pd, pocos);
            }
        }
 
        /// <summary>
        /// 批次插入业务实体类,根据Limit自动判断是Bulk插入还是Batch插入
        /// </summary>
        /// <param name="tableName"></param>
        /// <param name="primaryKeyName"></param>
        /// <param name="autoIncrement"></param>
        /// <param name="pocos"></param>
        protected override void InsertBulk(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos)
        {
            var pd = PocoData.ForType(pocos.First().GetType(), Database.DefaultMapper);
            pd.TableInfo.TableName = tableName;
            if (!string.IsNullOrEmpty(primaryKeyName)) pd.TableInfo.PrimaryKey = primaryKeyName;
            pd.TableInfo.AutoIncrement = autoIncrement;
 
            if (pocos.Count() >= Limit)
            {
                InsertBulkImpl(pd, pocos);
            }
            else
            {
                InsertBatch(pd, pocos);
            }
        }
 
        private void InsertBulkImpl<T>(PocoData pd, IEnumerable<T> pocos)
        {
            try
            {
                Database.OpenSharedConnection();
                using (var bulkCopy = new SqlBulkCopy(SqlConnectionResolver(Database), SqlBulkCopyOptions.Default, SqlTransactionResolver(Database)))
                {
                    var table = new DataTable();
                    var cols = pd.Columns.Where(x => !x.Value.ResultColumn
                        && !(pd.TableInfo.AutoIncrement && x.Value.ColumnName.Equals(pd.TableInfo.PrimaryKey, StringComparison.OrdinalIgnoreCase))).ToList();
 
                    foreach (var col in cols)
                    {
                        bulkCopy.ColumnMappings.Add(col.Value.PropertyInfo.Name, col.Value.ColumnName);
                        table.Columns.Add(col.Value.PropertyInfo.Name, Nullable.GetUnderlyingType(col.Value.PropertyInfo.PropertyType) ?? col.Value.PropertyInfo.PropertyType);
                    }
 
                    foreach (var item in pocos)
                    {
                        var values = new object[cols.Count];
                        for (var i = 0; i < values.Length; i++)
                        {
                            var value = cols[i].Value.GetValue(item);
                            if (Database.DefaultMapper != null)
                            {
                                var fn = Database.DefaultMapper.GetToDbConverter(cols[i].Value.PropertyInfo);
                                if (fn != null) value = fn(value);
                            }
                            values[i] = value;
                        }
                        table.Rows.Add(values);
                    }
 
                    bulkCopy.DestinationTableName = pd.TableInfo.TableName;
                    bulkCopy.WriteToServer(table);
                }
            }
            finally
            {
                Database.CloseSharedConnection();
            }
        }
    }
}