关于数据同步的几种实现

https://blog.csdn.net/xuemoyao/article/details/14002209

概述

关于数据同步主要有两个层面的同步,一是通过后台程序编码实现数据同步,二是直接作用于数据库,在数据库层面实现数据的同步。通过程序编码实现数据同步,其主要的实现思路很容易理解,即有就更新,无则新增,其他情况日志记录,就不做过多的介绍,这里主要讲述的是第二个层面的数据同步,即在数据库层面实现数据同步。

数据库层面的数据库同步主要有三种方式:通过发布/订阅的方式实现同步,通过SQL JOB方式实现数据同步,通过Service Broker 消息队列的方式实现数据同步。

下面分别就这三种数据同步方式,一一详解。

1.    通过发布/订阅的方式实现同步

发布/订阅是Sql Server自带的一种数据库备份的机制,通过该机制可以快速的实现数据的备份同步,不用编写任何的代码。

此种数据同步的方式存在的以下的一些问题:

  1. 表结构不能更改,同步双方的表结构必须一致,一旦表结构发生更改需要重新生成数据库快照。
  2. 对于大数据量的同步没有可靠的保证。
  3. 网络不稳定的情况下同步也不能保证。

总的来说,这种数据备份同步的方式,在表结构一致、数据量不是特别大的情况下还是非常高效的一种同步方式。

网上有很多的关于如何使用发布/订阅的方式实现数据同步的操作示例,这里就不再重复的演示了,有兴趣想要了解的朋友可以参考下面这篇文章:

http://kb.cnblogs.com/page/103975/

2.    通过SQL JOB方式实现数据同步

通过Sql Job定时作业的方式实现同步其基本原理就是通过目标服务器和源服务器的连接,然后通过编写Sql语句,从源服务器中读取数据,再更新到目标服务器。

这种数据同步的方式比较灵活。创建过sql定时作业之后,主要需要执行以下关键的两步。

2.1     创建数据库连接(一般作为定时作业执行的第一步)

不同数据库之间的连接可以通过系统的存储过程实现。下面就直接用一个示例来讲一下如何创建数据库连接。

–添加一个连接

–系统存储过程sp_addlinkedserver 参数:

———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’;———————-2:” (srvproduct,默认);

———————-3:’SQLOLEDB’(provider,默认值);

———————-4:目标服务器的IP或别名(datasrc),本例中为:’WIN-S1PO3UA6J7I’

exec sp_addlinkedserver ‘WIN-S1PO3UA6J7I’,”,’SQLOLEDB’,’WIN-S1PO3UA6J7I’

–添加登录用户连接

–系统存储过程sp_addlinkedsrvlogin 参数:

———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’;

———————-2:’false’,默认值;

———————-3:null,默认值;

———————-4:’sa’,登录用户名;

———————-5:’pass@word1’,登录密码;

exec sp_addlinkedsrvlogin ‘WIN-S1PO3UA6J7I’,’false’,null,’sa’,’pass@word1′

创建数据库连接主要用到了以上的两个存储过程,但是在实际操作的过程中可能会遇到“仍有对服务器XXX的远程登录或连接登录问题”这样的问题,如果遇到此类问题,在执行上边的添加连接和登录用户连接之前还需要先删除某个已存在的链接,具体如下:

–系统存储过程sp_droplinkedsrvlogin 参数:

———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’;———————-2:null

exec sp_droplinkedsrvlogin ‘WIN-S1PO3UA6J7I’,null

–系统存储过程sp_dropserver 参数:

———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’

exec sp_dropserver ‘WIN-S1PO3UA6J7I’

2.2     使用SQL 语句 实现数据同步

主要的同步思路:

1:在目标数据库中先清空要同步的表的数据

2:使用insert into Table (Cloumn….) select Column….. from 服务器别名或IP.目标数据库名.dbo.TableName 的语法将数据从源数据库读取并插入到目标数据库

Truncate  table  Org_DepartmentsExt –删除现有系统中已存在的部门表

insert into Org_DepartmentsExt –从名为WIN-S1PO3UA6J7I的服务器上的DBFrom数据库上获取源数据,并同步到目标数据库中

(

[DeptID]

,[DeptStatus]

,[DeptTel]

,[DeptBrief]

,[DeptFunctions]

)

SELECT [DeptID]

,[DeptStatus]

,[DeptTel]

,[DeptBrief]

,[DeptFunctions]

FROM [WIN-S1PO3UA6J7I].[DBFrom].[dbo].[Org_DepartmentsExt]

以上这两步便是通过SQL Job实现数据同步的关键步骤,在完成以上两步之后,如果没有其他的表要进行同步,则可创建同步计划以完善定时作业。带作业创建完后,便可以执行。

这里主要只是演示了通过Sql Job方式实现数据同步的关键步骤。网上有很多具体的实例演示。有兴趣的朋友可以参考以下文章进行练习检验:

http://www.cnblogs.com/tyb1222/archive/2011/05/27/2060075.html

3.    通过SQL Server Service Broker 消息队列的方式实现数据同步

3.1 SQL Server Service Broker概述

SQL Server Service Broker 是数据库引擎的组成部分,为 SQL Server 提供队列和可靠的消息传递。既可用于使用单个 SQL Server 实例的应用程序,也可用于在多个实例间分发工作的应用程序。

在单个 SQL Server 实例内,Service Broker 提供了一个功能强大的异步编程模型。数据库应用程序通常使用异步编程来缩短交互式响应时间,并增加应用程序总吞吐量。

在多个SQL Server实例之间Service Broker 还可以提供可靠的消息传递服务。Service Broker 可帮助开发人员通过称为服务的独立、自包含的组件来编写应用程序。需要使用这些服务中所包含功能的应用程序可以使用消息来与这些服务进行交互。Service Broker 使用 TCP/IP 在实例间交换消息。Service Broker 中所包含的功能有助于防止未经授权的网络访问,并可以对通过网络发送的消息进行加密。

3.2 具体的实现演示

在这一小节里,主要是通过一个完整的数据同步的流程向大家演示,如何实现同一个数据库实例不同数据库的数据同步。关于不同的数据库实例间的数据库的数据同步整体上跟同一个实例的数据库同步是一样的,只不过在不同的数据库实例间同步时还需启用传输安全、对话安全,创建路由、远程服务绑定等额外的操作。

这里边用到了大量的SQL Server XML的东西,如果有不理解的地方可以参考以下链接:http://www.cnblogs.com/Olive116/p/3355840.html

这是我在做技术准备时,自己的一点学习记录。

下面就是具体的实现步骤:

3.2.1为数据库启动Service Broker活动

这一步主要是用来对要进行数据同步的数据启用Service Broker 活动,并且授信。

[csharp] view plain copy

  1. USE master
  2. GO
  3. –如果数据库DBFrom、DBTo不存在,则创建相应的数据库
  4. IF NOT EXISTS (SELECT name FROM sys.databases WHERE name =‘DBFrom’)
  5. CREATE DATABASE DBFrom
  6. GO
  7. IF NOT EXISTS (SELECT name FROM sys.databases WHERE name =‘DBTo’)
  8. CREATE DATABASE DBTo
  9. GO
  10. –分别为该数据库启用Service Broker活动并且授权信任
  11. ALTER DATABASE DBFrom SET ENABLE_BROKER
  12. GO
  13. ALTER DATABASE DBFrom SET TRUSTWORTHY ON
  14. GO
  15. ALTER AUTHORIZATION ON DATABASE::DBFrom To sa
  16. GO
  17. ALTER DATABASE DBTo SET ENABLE_BROKER
  18. GO
  19. ALTER DATABASE DBTo SET TRUSTWORTHY ON
  20. GO
  21. ALTER AUTHORIZATION ON DATABASE::DBTo TO sa
  22. GO

 

3.2.2 创建数据库主密匙

这一步主要用来创建数据库主密匙,上边有提到Service Broker可以对要发送的消息进行加密。

[csharp] view plain copy

  1. Use DBFrom
  2. go
  3. create master key
  4. encryption by password=‘pass@word1’
  5. go
  6. Use DBTo
  7. go
  8. create master key
  9. encryption by password=‘pass@word1’
  10. go

 

3.2.3 创建消息类型、协定

这里主要用来创建消息类型和消息协定,源数据库和目标数据库的消息类型和协定都要一致。

[csharp] view plain copy

  1. Use DBFrom
  2. go
  3. –数据同步—消息类型
  4. create message type [http://oa.founder.com/Data/Sync]
  5. validation=well_formed_xml
  6. go
  7. –数据同步–错误反馈消息类型
  8. create message type [http://oa.founder.com/Data/Sync/Error]
  9. validation=well_formed_xml
  10. go
  11. –数据同步协议
  12. create contract[http://oa.founder.com/Data/SyncContract]
  13. (
  14. [http://oa.founder.com/Data/Sync]
  15. sent by initiator,
  16. [http://oa.founder.com/Data/Sync/Error]
  17. sent by target
  18. )
  19. go
  20. Use DBTo
  21. go
  22. –数据同步—消息类型
  23. create message type [http://oa.founder.com/Data/Sync]
  24. validation=well_formed_xml
  25. go
  26. –数据同步–错误反馈消息类型
  27. create message type [http://oa.founder.com/Data/Sync/Error]
  28. validation=well_formed_xml
  29. go
  30. –数据同步协议
  31. create contract[http://oa.founder.com/Data/SyncContract]
  32. (
  33. [http://oa.founder.com/Data/Sync]
  34. sent by initiator,
  35. [http://oa.founder.com/Data/Sync/Error]
  36. sent by target
  37. )
  38. Go

 

创建过之后效果如下图:

3.2.4 创建消息队列

这里主要用来创建消息队列,源数据库和目标数据库都要创建,队列名字可以自主命名。

[csharp] view plain copy

  1. use DBFrom
  2. go
  3. create queue [DBFrom_DataSyncQueue]
  4. with status=on
  5. go
  6. use DBTo
  7. go
  8. create queue [DBFrom_DataSyncQueue]
  9. with status=on
  10. go

 

创建之后效果如下图:

3.2.5 创建数据同步服务

这里我们通过利用上边创建的消息协定和消息队列来创建数据同步的服务。

[csharp] view plain copy

  1. use DBFrom
  2. go
  3. create service [http://oa.founder.com/DBFrom/Data/SyncService]
  4. on queue dbo.[DBFrom_DataSyncQueue]([http://oa.founder.com/Data/SyncContract])
  5. go
  6. –数据同步服务
  7. use DBTo
  8. go
  9. create service [http://oa.founder.com/DBTo/Data/SyncService]
  10. on queue dbo.[DBFrom_DataSyncQueue]([http://oa.founder.com/Data/SyncContract])
  11. go

 

创建后效果如下图:

3.2.6 在源数据库上创建服务配置列表

这里需要在源数据库上创建一个服务配置列表,主要用来保存之前创建过的服务名称,本例只是用来演示,所以只创建了一个服务,只能是同步一个数据表,如果有多个数据表需要同步,则需创建多个服务,所以这里创建一个服务配置列表,用来存储多个服务的服务名称。

需要注意的是,下面的脚本在执行完创建表的操作之后又插入了一条数据,也就是上边我们创建的服务名,如果有多个服务的话,依次插入该表即可。

[csharp] view plain copy

  1. use DBFrom
  2. go
  3. –同步数据–目标服务配置
  4. create table SyncDataFarServices
  5. (
  6. ServiceID uniqueidentifier,
  7. ServiceName nvarchar(256)
  8. )
  9. go
  10. –将上边创建的服务名,插入此表中
  11. insert into SyncDataFarServices (ServiceID,ServiceName)
  12. values
  13. (NEWID(),‘http://oa.founder.com/DBTo/Data/SyncService’)
  14. go

 

效果如下图:

 

3.2.7 发送数据同步消息

这里创建了一个存储过程主要用来发送同步消息,该消息内容主要包括操作类型、主键、表名、正文内容,分别对应@DMLType,@PrimaryKeyField,@TableName,@XMLData。然后通过创建一个游标来条的读取上边创建的服务列表中的列表信息,向不同的服务发送消息。

[csharp] view plain copy

  1.  Use DBFrom
  2. go
  3. –发送同步数据消息
  4. Create procedure UP_SyncDataSendMsg
  5. (
  6. @PrimaryKeyField nvarchar(128),
  7. @TableName nvarchar(128),
  8. @DMLType char(1),
  9. @XMLData xml
  10. )
  11. as
  12. begin
  13.    SET @XMLData.modify(‘insert <DMLType>{sql:variable(“@DMLType”)}</DMLType>  as first into /’);
  14.     SET @XMLData.modify(‘insert <PrimaryKeyField>{sql:variable(“@PrimaryKeyField”)}</PrimaryKeyField>  as first into /’);
  15.     SET @XMLData.modify(‘insert <Table>{sql:variable(“@TableName”)}</Table> as first into /’);
  16.     DECLARE FarServices CURSOR FOR SELECT ServiceName FROM SyncDataFarServices;
  17.    open FarServices
  18.       declare @FarServiceName nvarchar(256);
  19.       fetch FarServices into @FarServiceName;
  20.       while @@FETCH_STATUS=0
  21.         begin
  22.            begin Transaction
  23.              declare @Conv_Handler uniqueidentifier
  24.              begin DIALOG conversation @Conv_Handler –开始一个会话
  25.              from service [http://oa.founder.com/DBFrom/Data/SyncService]
  26.              to service @FarServiceName
  27.              on contract [http://oa.founder.com/Data/SyncContract];          
  28.              send on conversation @Conv_Handler
  29.              Message type [http://oa.founder.com/Data/Sync](@XMLData);
  30.              fetch FarServices into @FarServiceName;
  31.              commit;
  32.         end
  33.     close FarServices;
  34.     deallocate FarServices;
  35. end
  36. go

 

3.2.8 创建数据同步异常信息记录表

这里创建该表主要用来记录在数据同步过程中出现的异常信息。

[csharp] view plain copy

  1. use DBFrom
  2. go
  3. create Table dbo.SyncException
  4. (
  5. ErrorID uniqueidentifier,
  6. ConversationHandleID uniqueidentifier,
  7. ErrorNumber int,
  8. ErrorSeverity int,
  9. ErrorState int,
  10. ErrorProcedure nvarchar(126),
  11. ErrorLine int,
  12. ErrorMessage nvarchar(2048),
  13. MessageContent nvarchar(max),
  14. CreateDate DateTime
  15. )
  16. go
  17. –修改异常信息记录表
  18. alter table dbo.SyncException
  19. add
  20. PrimaryKeyField nvarchar(128),
  21. TableName nvarchar(128),
  22. DMLType char(1),
  23. DBName nvarchar(128)
  24. Go

 

效果如下图:

3.2.9 数据同步反馈

这里主要用来在源数据库中接收队列中的消息,将同时出错的信息,解析一下,然后插入到异常信息记录表里边。

–数据同步回馈

[csharp] view plain copy

  1. use DBFrom
  2. go
  3. create procedure UP_SyncDataFeedback
  4. as
  5. begin
  6. set nocount on
  7. –会话变量声明
  8. declare @ConversationHandle uniqueidentifier;–会话句柄
  9. declare @Msg_Body nvarchar(max);
  10. declare @Msg_Type_Name sysname;
  11. –变量赋值
  12. while(1=1)
  13. begin
  14.   begin transaction
  15.      –从队列中接收消息
  16.      waitfor
  17.      (
  18.         receive top(1)
  19.         @Msg_Type_Name=message_type_name,
  20.         @ConversationHandle=[conversation_handle],
  21.         @Msg_Body=message_body
  22.         from dbo.[DBFrom_DataSyncQueue]
  23.      ),timeout 1000
  24.      –如果接收到消息处理,否则跳过
  25.      if(@@ROWCOUNT<=0)
  26.         break;
  27.      if @Msg_Type_Name=‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’
  28.          end conversation @ConversationHandle;
  29.      else if @Msg_Type_Name=‘http://oa.founder.com/Data/Sync/Error’
  30.          begin
  31.              declare @DataSource xml;
  32.              set @DataSource=Convert(xml,@Msg_Body);
  33.              insert into dbo.SyncException(ErrorID,ConversationHandleID,ErrorNumber,ErrorSeverity,ErrorState,ErrorProcedure,ErrorLine,ErrorMessage,
  34. PrimaryKeyField,TableName,DMLType,MessageContent,DBName,CreateDate)
  35.              select
  36.              NEWID(),@ConversationHandle,
  37.              T.c.value(‘./@ErrNumber’,‘INT’),
  38.              T.c.value(‘./@ErrSeverity’,‘INT’),
  39.              T.c.value(‘./@ErrState’,‘INT’),
  40.              T.c.value(‘./@ErrProcedure’,‘Nvarchar(126)’),
  41.              T.c.value(‘./@ErrLine’,‘INT’),
  42.              T.c.value(‘./@ErrMessage’,‘nvarchar(2048)’),
  43.              T.c.value(‘./@PrimaryKeyField’,‘nvarchar(128)’),
  44.              T.c.value(‘./@TableName’,‘nvarchar(128)’),
  45.              T.c.value(‘./@DMLType’,‘char(1)’),
  46.              T.c.value(‘./@MessageContent’,‘nvarchar(max)’),
  47.             T.c.value(‘./@DBName’,‘nvarchar(128)’),
  48.              GETDATE()
  49.              from @DataSource.nodes(‘/row’as T(c);
  50.           end
  51.      else if @Msg_Type_Name=‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’
  52.          end conversation @ConversationHandle;
  53.   commit Transaction;
  54. end
  55. end
  56. commit;
  57. go

 

3.2.10对Service Broker队列使用内部激活,并指定将调用的存储过程

这里主要用来激活源数据库的消息队列,并为其指定调用的存储过程,即上边3.2.9 中创建的存储过程。

[csharp] view plain copy

  1. –对Service Broker队列使用内部激活,并指定将调用的存储过程
  2. use DBFrom
  3. go
  4. alter queue dbo.DBFrom_DataSyncQueue with activation
  5. (
  6.   status=on,
  7.   max_queue_Readers=1,
  8.   procedure_name=UP_SyncDataFeedback,
  9.   execute as owner
  10. );
  11. Go

 

3.2.11 在源数据库中为需要同步的数据表创建触发器

这里就以用户表为例,具体操作如下,这里通过查询系统的Inserted和Deleted临时表来判断执行同步的操作类型是更新(U)、新增(A)还是删除(D),最后调用3.2.7 中创建的存储过程来对数据进行处理并发送。

[csharp] view plain copy

  1. use DBFrom
  2. Go
  3. –用户信息同步
  4. Create Trigger UT_DataSync_Users
  5. on dbo.Org_Users
  6. after insert,update,delete
  7. as
  8. set nocount on ;
  9. –变量声明
  10. declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@DMLType char(1);
  11. declare @InsertCount int ,@DeleteCount int ;
  12. declare @XMLData xml;
  13. –变量赋值
  14. set @PrimaryKeyField=‘ID’ –组合主键,多个主键使用“,”隔开
  15. set @TableName=‘Org_Users’
  16. set @InsertCount=(select COUNT(*) from inserted)
  17. set @DeleteCount=(select COUNT(*) from deleted)
  18. if @InsertCount=@DeleteCount and @InsertCount<>0  —-Update
  19.   begin
  20.   select @XMLData=(select * from inserted For xml raw,binary base64,ELEMENTS XSINIL);
  21.   set @DMLType=‘U’;
  22.   end
  23. else if(@InsertCount<>0 and @DeleteCount=0) —-Insert
  24.   begin
  25.   select @XMLData=(select * from inserted for xml raw ,Binary base64,ELEMENTS XSINIL)
  26.   set @DMLType=‘A’;
  27.   end
  28. else—-Delete
  29.   begin
  30.       select @XMLData=(select *from deleted for xml raw,binary base64,ELEMENTS XSINIL)
  31.       set @DMLType=‘D’;
  32.   end
  33.  if(@XMLData is not null)
  34.  begin
  35.      exec UP_SyncDataSendMsg @PrimaryKeyField,@TableName,@DMLType,@XMLData;
  36.  end
  37.  go

 

3.2.12 目标数据库中创建,字符分割函数

该函数主要是用来进行字符分割,用来处理主键有多个字段的情况。

–目标数据库

[csharp] view plain copy

  1. use DBTo
  2. go
  3. –转换用‘,’分割的字符串@str
  4. create Function dbo.uf_SplitString
  5. (
  6. @str nvarchar(max),
  7. @Separator nchar(1)=‘,’
  8. )
  9. returns nvarchar(2000)
  10. as
  11. begin
  12.    declare @Fields xml;–结果字段列表
  13.   declare @Num int;—–记录循环次数
  14.   declare @Pos int;—–记录开始搜索位置
  15.    declare @NextPos int;–搜索位置临时变量
  16.    declare @FieldValue nvarchar(256);–搜索结果
  17.    set @Num=0;
  18.    set @Pos=1;
  19.    set @Fields=CONVERT(xml,‘<Fields></Fields>’);
  20.    while (@Pos<=LEN(@Str))
  21.       begin
  22.         select @NextPos=CHARINDEX(@Separator,@Str,@Pos)
  23.         if(@NextPos=0 OR @NextPos is null)
  24.             select @NextPos=LEN(@Str)+1;
  25.         select @FieldValue=RTRIM(ltrim(substring(@Str,@Pos,@NextPos-@Pos)))
  26.         select @Pos=@NextPos+1
  27.         set @Num=@Num+1;
  28.         if @FieldValue<> 
  29.            begin
  30.              set @Fields.modify(‘insert <Field>{sql:variable(“@FieldValue”)}</Field> as last into /Fields[1]’);
  31.            end
  32.       end
  33.       return Convert(nvarchar(2000),@Fields);
  34. end
  35. go

 

3.2.13 将解析过的消息信息,根据操作类型的不同同步到数据表中

这是所有的数据同步中最关键也是最复杂的一步了,在整个开发的过程中,大部分时间都花在这上边了,具体的操作都在下面解释的很清楚了。

[csharp] view plain copy

  1. –将XML数据源中的数据同步到数据表中(包括增删改)
  2. Use DBTo
  3. go
  4. create function dbo.UF_XMLDataSourceToSQL
  5. (
  6.    @DataSource XML,–数据源
  7.    @TableName varchar(128),–同步数据表名称
  8.    @PrimaryKeyField varchar(128),–需要同步的表的主键,主键为多个时用‘,’隔开
  9.    @DMLType char(1) –A:新建;U:编辑;D:删除
  10. )
  11. returns nvarchar(4000)
  12. as
  13. begin
  14.     –变量声明及数据初始化
  15.     –声明数据表@TableName列Column相关信息变量
  16.     declare @ColumnName nvarchar(128),@DataType nvarchar(128),@MaxLength int;
  17.     –声明用于拼接SQL的变量
  18.    declare @FieldsList nvarchar(4000),@QueryStatement nvarchar(4000);
  19.     declare @Sql nvarchar(4000);
  20.     declare @StrLength int;
  21.     –变量初始化
  22.     set @FieldsList=‘  ‘;–初始化变量不为null,否则对变量使用‘+=’操作符无效
  23.   set @QueryStatement=‘  ‘;
  24.     –主键信息,根据参数求解如:<Fields><Field>ID1</Field><Field>ID2</Field></Fields>
  25.     declare @PKs xml;
  26.     –当前字段是否主键-在‘更新’,‘删除’同步数据时使用
  27.     declare @IsPK nvarchar(128);
  28.     –初始化游标–游标内容包括目标数据表TableName列信息
  29.     DECLARE ColumnNameList CURSOR FOR SELECT COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@TableName AND
  30. DATA_TYPE<>‘xml’;
  31.    –数据处理
  32.     if @DMLType=‘A’–插入数据
  33.        begin
  34.           open ColumnNameList
  35.              fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
  36.              while @@FETCH_STATUS=0
  37.                begin
  38.                –判断数据源列中是否存在属性:@ColumnName
  39.                –判断数据源列中是否存在–元素:@ColumnName
  40.                If @DataSource.exist(‘/row/*[local-name()=sql:variable(“@ColumnName”)]’)=1
  41.                  begin
  42.                     –拼接SQL
  43.                     set @FieldsList+=(@ColumnName+‘,’);
  44.                    set @QueryStatement+=(‘T.c.value(‘‘(./’+@ColumnName+‘[not(@xsi:nil)])[1]’‘,’+@DataType);–元素读取(包含空值情况)
  45.                     if @MaxLength is not null and @MaxLength<>-1
  46.                        begin
  47.                           set @QueryStatement+=‘(‘+CONVERT(nvarchar,@MaxLength)+‘)’;
  48.                         end
  49.                     else if @MaxLength=-1 and @DataType<>‘xml’–已调整
  50.                        begin
  51.                           set @QueryStatement+=‘(MAX)’;
  52.                        end
  53.                     set @QueryStatement+=(‘) as ‘+@ColumnName+‘,’);
  54.                  end
  55.                  fetch ColumnNameList into @ColumnName,@DataType,@MaxLength
  56.                end
  57.         close ColumnNameList;
  58.         deallocate ColumnNameList;
  59.         set @StrLength=LEN(@FieldsList);
  60.         –去掉@FieldsList结尾的’,’
  61.        set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);
  62.         set @StrLength=LEN(@QueryStatement);
  63.         –去掉@QueryStatement结尾的’,’
  64.         set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
  65.               set @Sql=N‘insert into ‘+@TableName+‘(‘+@FieldsList+‘) select ‘+@QueryStatement+‘ from @DataSource.nodes(‘‘/row/’‘) as T(c)’;
  66.        end
  67.     else if @DMLType=‘U’–更新数据
  68.         begin
  69.            –更新语句where 后的条件表达式
  70.            declare @Condition nvarchar(1000);
  71.            set @Condition=‘  ‘;
  72.            set @PKs=CONVERT(xml,dbo.uf_SplitString(@PrimaryKeyField,‘,’));
  73.            Open ColumnNameList
  74.                 fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
  75.                while @@FETCH_STATUS=0
  76.                 begin
  77.                 –判断数据源列中是否存在元素:@ColumnName
  78.                   if @DataSource.exist(‘/row/*[local-name()=sql:variable(“@ColumnName”)]’)=1
  79.                   begin
  80.                      set @IsPK=null;
  81.                      SELECT @IsPk=Fs.F FROM (SELECT T.c.value(‘.[text()]’,‘Nvarchar(128)’) AS F FROM @PKs.nodes(‘/Fields/Field’) AS T(c))Fs Where Fs.F=@ColumnName
  82.                      if @IsPK is null or @IsPK=
  83.                      begin
  84.                        –非主键,更新字段值
  85.                        set @FieldsList+=(@ColumnName+‘=Source.’+@ColumnName+‘,’);
  86.                      end
  87.                      else
  88.                      begin
  89.                         –主键,作为要更新条件
  90.                        set @Condition+=@TableName+‘.’+@ColumnName+‘=Source.’+@ColumnName+‘ And ‘;
  91.                      end
  92.                      –XML查询
  93.                      set @QueryStatement+=(‘T.c.value(‘‘(./’+@ColumnName+‘[not(@xsi:nil)])[1]’‘,’+@DataType);–元素读取(包含空值情况)
  94.                      if @MaxLength is not null and @MaxLength<>-1
  95.                         begin
  96.                            set @QueryStatement+=‘(‘+CONVERT(nvarchar,@MaxLength)+‘)’;
  97.                         end
  98.                      else if @MaxLength=-1 and @DataType<>‘xml’
  99.                         begin
  100.                            set @QueryStatement+=‘(max)’;
  101.                         end
  102.                       set @QueryStatement+=(‘) as ‘+@ColumnName+‘,’);
  103.                   end
  104.                   fetch ColumnNameList Into @ColumnName,@DataType,@MaxLength
  105.                 end
  106.             close ColumnNameList;
  107.             Deallocate ColumnNameList;
  108.             –去掉@FieldsList结尾的‘,’
  109.             set @StrLength=LEN(@FieldsList);
  110.             set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);
  111.          –去掉@QueryStatement结尾的‘,’
  112.          set @StrLength=LEN(@QueryStatement);
  113.          set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
  114.          –去掉@Condition结尾的‘and’
  115.          set @StrLength=LEN(rtrim(@Condition));
  116.          set @Condition=SUBSTRING(rtrim(@Condition),1,@StrLength-3);
  117.             set @Sql=N‘USE DBTo ; update ‘+@TableName+‘ set ‘+@FieldsList+‘ from (select ‘+@QueryStatement+’
  118.             from @DataSource.nodes(/rowas T(c)) Source where ‘+@Condition;
  119.     end
  120.     else if @DMLType=‘D’ –删除数据
  121.        begin
  122.          –更新语句where后的条件表达式
  123.          declare @LinkField nvarchar(1000);
  124.          set @LinkField=‘  ‘;
  125.          set @PKs=CONVERT(xml,dbo.uf_SplistString(@PrimaryKeyField,‘,’));
  126.          open ColumnNameList
  127.             fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
  128.             while @@FETCH_STATUS=0
  129.             begin
  130.             if @DataSource.exist(‘row/*[local-name()=sql:variable(“@ColumnName”)]’)=1
  131.              begin
  132.               set @IsPK=null;–初始化
  133.               –当前字段是否为主键
  134.               select @IsPK=Fs.F from (select T.c.value(‘.[text()]’,‘nvarchar(128)’as F from @PKs.nodes(‘/Fields/Field’as T(c))Fs where Fs.F=@ColumnName
  135.               –主键
  136.               if @IsPK is not null and @IsPK<>
  137.               begin
  138.                  –主键删除条件
  139.                  set @LinkField+=‘Target.’+@ColumnName+‘=Source.’+@ColumnName+‘ And ‘;
  140.                  –XML 查询
  141.                  set @QueryStatement+=(‘T.c.value(‘‘(./’+@ColumnName+‘[not(@xsi:nil)])[1]’‘,’+@DataType);–元素读取(包含空值情况)
  142.                 if(@MaxLength is not null and @MaxLength<>-1)
  143.                    begin
  144.                       set @QueryStatement+=‘(‘+CONVERT(nvarchar,@MaxLength)+‘)’;
  145.                    end
  146.                 else if @MaxLength=-1 and @DataType<>‘xml’
  147.                    begin
  148.                    set @QueryStatement+=‘(max)’;
  149.                   end
  150.                 set @QueryStatement+=(‘) as ‘+@ColumnName+‘,’);
  151.               end
  152.              end
  153.             fetch ColumnNameList into @ColumnName,@DataType,@MaxLength
  154.             end
  155.             close ColumnNameList;
  156.             deallocate ColumnNameList;
  157.             –去除@QueryStateMent结尾的‘,’
  158.             set @StrLength=LEN(@QueryStatement);
  159.             set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
  160.             –去除@LinkField 结尾的’Add‘
  161.             set @StrLength=LEN(rtrim(@LinkField));
  162.             set @LinkField=SUBSTRING(rtrim(@LinkField),1,@StrLength-3);
  163.             set @Sql=N‘Delete from ‘+@TableName+‘ from ‘+@TableName+‘ as Target inner join (select ‘+@QueryStatement+ ‘ from @DataSource.nodes(‘‘/row’‘) as T(c))
  164.  Source on ‘+@LinkField;
  165.      end
  166.          Return @Sql–‘hello’
  167. end
  168. go

 

3.2.14 解析并处理从队列中读取的消息

这里主要用来读取队列中的消息,并将消息进行处理,最终处理成一定的格式,并调用3.2.13中的存储过程,将数据同步到数据库中。

[csharp] view plain copy

  1. –将数据同步到数据表中
  2. create procedure UP_SyncDataToTable
  3. as
  4. begin
  5. set nocount on
  6. –会话变量声明
  7. declare @ConversationHandle uniqueidentifier;–会话句柄
  8. declare @Msg_Body nvarchar(max);
  9. declare @Msg_Type_Name sysname;
  10. declare @ErrorNumber int ;
  11. –变量赋值
  12. while(1=1)
  13.   begin
  14.     begin transaction
  15.        –从队列中接收消息
  16.        waitfor
  17.        (
  18.          receive top(1)
  19.          @Msg_Type_Name=message_type_name,
  20.          @ConversationHandle=[conversation_handle],
  21.          @Msg_Body=message_body
  22. —         from dbo.[DBTo_DataSyncQueue]
  23.          from dbo.[DBFrom_DataSyncQueue]
  24.        ),timeout 500
  25.        –如果接收到消息-处理,否则跳过
  26.        if @@ROWCOUNT<=0
  27.          begin
  28.            break;
  29.         end
  30.        if @Msg_Type_Name=‘http://oa.founder.com/Data/Sync’
  31.        begin
  32.          –声明变量
  33.          declare @DMLType char(1);
  34.          declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@Sql nvarchar(4000);
  35.          declare @DataSource xml
  36.         –受影响的行数
  37.          declare @EffectRowCount int;
  38.         declare @ErrMsg xml;
  39.             begin try
  40.                –变量赋值
  41.                set @DataSource=convert(xml,@Msg_Body);–数据源
  42.                set @PrimaryKeyField=@DataSource.value(‘(/PrimaryKeyField)[1][text()]’,‘nvarchar(128)’);–主键列表
  43.                set @TableName=@DataSource.value(‘(/Table)[1][text()]’,‘nvarchar(128)’);–操作数据表
  44.                set @DMLType=@DataSource.value(‘/DMLType[1][text()]’,‘char(1)’);–操作类型
  45.                set @Sql=dbo.UF_XMLDataSourceToSQL(@DataSource,@TableName,@PrimaryKeyField,@DMLType);
  46.                exec sp_executesql @Sql,
  47.                N‘@DataSource XML’,
  48.                @DataSource;
  49.             end try
  50.             begin catch
  51.                declare @DBName nvarchar(128)
  52.                select @DBName=Name from master..SysDataBases where dbid=(select dbid from master..sysprocesses where spid=@@SPID)
  53.                set @ErrorNumber=ERROR_NUMBER();
  54.                set @ErrMsg=(select ERROR_NUMBER() as ErrNumber,
  55.                                 ERROR_SEVERITY() as ErrSeverity,
  56.                                 ERROR_STATE() as ErrState,
  57.                                ERROR_PROCEDURE() as ErrProcedure,
  58.                                 ERROR_LINE() as ErrLine,
  59.                                 ERROR_MESSAGE() as ErrMessage,
  60.                                 @PrimaryKeyField as PrimaryKeyField,
  61.                                 @TableName as TableName,
  62.                                 @DMLType as DMLType,
  63.                                 @Msg_Body as MessageContent,
  64.                                 @DBName as DBName
  65.                            for XML raw);
  66.                   –GOTO 错误处理标签
  67.                   goto Err_Handle;
  68.             end catch
  69.             –结束会话
  70.             End Conversation @ConversationHandle
  71.             if @ErrorNumber is not null
  72.             begin
  73.                –错误处理区域
  74.                Err_Handle:
  75.                   if @ErrMsg is not null
  76.                   begin
  77.                     declare @test nvarchar(128);
  78.                     –发送失败消息
  79.                     send on conversation @ConversationHandle
  80.                     message type [http://oa.founder.com/Data/Sync/Error](@ErrMsg)
  81.                   end
  82.                   –结束会话
  83.                end conversation @ConversationHandle
  84.                —break;
  85.                –回滚–不可回滚,否则将无法发送失败消息
  86.                –GoTO  Err_Lab;
  87.             end
  88.         end
  89.     commit transaction
  90.        end
  91. end
  92. go

 

3.2.15 对目标数据库的消息队列进行内部激活

这里主要是用来激活目标数据库的消息队列,主要用来实现数据的同步以及同步出错的错误信息的反馈。

[csharp] view plain copy

  1. –对Service Broker队列使用内部激活,并指定将要调用的存储过程
  2. use DBTo
  3. go
  4. –alter Queue dbo.[DBTo_DataSyncQueue] with activation
  5. alter Queue dbo.[DBFrom_DataSyncQueue] with activation
  6. (
  7.   status=on,
  8.   max_queue_readers=1,
  9.   Procedure_name=UP_SyncDataToTable,
  10.   Execute as self
  11. )
  12. Go

 

完成以上这些步骤以后,就可以实现同一数据库实例上两个不同的数据库之间的数据同步。即如果DBFrom数据库中的Org_Users中的某一条信息发生变化,会马上的自动同步到DBTo数据库中的Org_Users 表。如果是想要实现不同的数据库实例间的数据库的表的同步,则可以参考以下链接:

http://www.cnblogs.com/downmoon/archive/2011/05/05/2037830.html

在创建启用传输安全、对话安全,创建路由、远程服务绑定等额外的操作之后,剩下的操作跟在同一数据库实例中的操作是一样的。

此外,本文还参考了如下的链接:

http://www.cnblogs.com/downmoon/archive/2011/04/05/2005900.html

希望可以给大家一些启发和帮助。具体的源码有兴趣的朋友可以留下邮箱。

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.