Oracle数据库细粒度审计(Fine-Grained-Audit)应用

Oracle数据库细粒度审计(Fine-Grained-Audit)应用

设计目标:
在实际的业务场景中,经常有审计“某人、某时间段、查看了某条敏感数据”这样的需求,Oracle本身提供了DB Vault、Audit Vault组件。
本文讨论使用细粒度审计,实现医院“防统方”功能:
背景: 医生开药后会从对应厂商医药代表处收取回扣,而收取回扣,需要统计处方中的药品、数量、医生信息。现阶段“纪委”部门严禁“统方”——对处方用药进行统计——操作。
期望实现对“可疑操作”的审计记录功能,后续对“可疑”记录进行进一步分析,进而准确确认是否为“统方”操作。
所以,技术实现上分两个阶段:
可疑操作记录
可疑操作分析
架构设计应具备可扩展性:
规则可定义
能够适用其他“敏感数据”审计需求

阶段一: 可疑操作记录

概述: 通过Oracle FGA,记录可疑操作。
FGA技术背景:
DBMS_FGA.ADD_POLICY(
object_schema VARCHAR2,
object_name VARCHAR2,
policy_name VARCHAR2,
audit_condition VARCHAR2,
audit_column VARCHAR2,
handler_schema VARCHAR2,
handler_module VARCHAR2,
enable BOOLEAN,
statement_types VARCHAR2,
audit_trail BINARY_INTEGER IN DEFAULT,
audit_column_opts BINARY_INTEGER IN DEFAULT);
Oracle FGA核心的操作就是这个存储过程的调用:
我们需要指定object_schema, object_name来限定是对哪个表(对象)来做审计;
使用audit_condition(SQL 逻辑表达式,但是有一定限制,如不能使用子查询,不能使用rownum伪列等)来限制“行”级别的敏感数据——只有符合此条件的,才触发审计操作。
使用audit_column来指定“敏感”列,只有涉及敏感列的操作才被审计,结合audit_column_opts,灵活定义敏感列(其实就是Any还是All的关系)
audit_trail来指定审计结果存放的“位置”,也决定审计记录的数据内容(特别是是否记录SQL,以及绑定变量),这里最需要特别指出的是,这里的设置不依赖于数据库级别的audit_trail 初始化参数。
handler_schema 和 handler_module指定“敏感”操作发生时,调用的存储过程,用来自定义一些处理操作。

本阶段的思路是,充分利用dbms_fga.add_policy中对敏感数据(行-Level、列-Level)的定义,尽可能的精确过滤出敏感数据——因为如果过滤效率低,那么会产生大量的audit trail记录,也给后续(第二阶段)的处理带来负担,同时还会降低业务正常操作的性能。
然后,使用handler_schema、handler_module指定的存储过程来进一步记录更多的session级别信息,便于后续(第二阶段)更加有效的分析“操作是否违规”。比如记录IP地址、应用程序中的用户名、应用程序的模块等。
刚才提到了,为了后续分析“操作是否违规”,需要提供IP,应用程序用户、应用程序模块等,这些信息需要使用“Database Session-Based Application Context”功能来存储、获取这些信息。Application Context内容相对简单,此处不做过多背景描述,后续实际操作中再讲解用法。

因为所有“统方”操作需要查询“处方”表,所以本例中,“处方表”——MZ_RECIPE就是我们需要审计的敏感数据所在表,其定义如下:
/*处方表mz_recipe*/

order_id number(10,0) not null, /*挂号id*/
recipe_type number(3,0) not null, /*处方类型:100=西药、200=中成药、300=草药、400=检查、500=治疗、600=物料、700=手术*/
recipe_id number(10,0) not null, /*处方号*/
group_id number(10,0)null, /*组号*/
item_id number(10,0) not null, /*项目序号*/
item_code number(10,0) not null, /*项目编码*/
item_name varchar2(255) not null, /*项目名称*/
unit_price number(10,4) not null, /*单价*/
num number(10,2) not null, /*总数量*/
nums_mean varchar(20) /*次/天显示字段*/
timesoneday number(5,0), /*次/天*/
numsonetime number(10,2), /*量/次*/
usage char(10), /*草药用法*/
method number(10,0) /*药品用法*/
Use_days number(10,0) /*给药天数*/
Times_count number(10,0) /*在院内执行次数*/
Item_kind number(5,0) /*处方类型1-药品,2-注射药品,3-检查治疗,4-草药, 5-注射附带项目,6-抽血附带项目 */
recipe_input_type number(10,0) /处方录入方式,0-收费划价,1-医生划价, 如果is_self=2 此处填写相关需发药的item_id /
recipe_dr char(8), /*处方医生*/
inputer char(8), /*录入员*/
recipe_date date, /*处方日期*/
item_state number(1), /*项目状态:0=待收费,1=已收费,2=已发药/已确认/已报告/完成,3-作废,5-已申请(预约)4-报告,9-系统作废*/
discount number(5,0), /*打折比例*(新加)/
card_id number(10,0), /*打折卡号*(新加)/
is_self number(5,0) /*0-正常,1-自备药,2-不发药*/
execute office char(6) /*开出科室*/
medi_office char(6), /*领药科室*/

我们先来看看,如何就这张表,定义其敏感列(Column-Level),经过分析(查看现有对这个表的操作sql语句),选择“num”字段作为这个表的敏感列——仅当操作(本例指Select)涉及此Column,我们才认为操作了敏感数据,需要审计。
定义敏感行(Row-Level),与医生讨论确认,recipe_type(处方类型)仅仅为“西药”、“中成药”才是我们关注的敏感数据(拿医药代表的回扣,只会针对西药、中成药),也就是说,操作(Select)的数据行,只有当recipe_type为100或者200,才触发我们的审计。
根据上述分析初步实施后,实际观测发现,仅仅定义上述“敏感”过滤条件,仍然有大量SQL操作被认为需要审计、记录。进一步与医院医生讨论,认为可以进一步增加一个“敏感行”定义——只有操作(Select)涉及到12小时前数据行时,才触发审计操作(医药代表发钱,一般每个月一次)。
所以我们的敏感数据定义为:
操作(Select)涉及“num”列
And 操作的数据行处方类型为西药或者中成药(recipe_type in (100,200))
And 操作的数据行,处方日期为12小时前(recipe_date < (sysdate - 1/2))

就目前为止,我们可以定义如下的审计策略(Policy):
begin
dbms_fga.add_policy(
object_schema => 'ORACLE', — 包含敏感数据表的schema
object_name => 'MZ_RECIPE', — 敏感数据所在表
policy_name => 'LG_FTF_TEST1', — Policy 名称
audit_condition => '(RECIPE_TYPE = 100 or RECIPE_TYPE = 200) AND RECIPE_DATE < (SYSDATE-1/2)', — 敏感行(ROW-Level)定义
statement_types => 'SELECT', — 操作
audit_column => 'NUM', — 敏感列(Column-Level)定义
handler_schema => 'ORACLE', /* 后面讨论 */
handler_module => ‘set_fga_ctx_pkg.handler_audit', /* 后面再讨论 */
enable => true
);
end;

// Disable : exec dbms_fga.disable_policy('ORACLE','MZ_RECIPE','LG_FTF_TEST1');

添加此策略后,一旦有“敏感数据”(符合Row-Level + Column-Level定义)被操作,将会在sys.fga_log$表中添加一行记录,所以,需要把fga_log$表存放到合适的表空间、并且保证充足的空闲空间,否则就会因为“审计”,影响“业务”。修改该表空间操作,有特定的Package完成,参看Oracle Security 手册,别手工去move表。
也可以查询dba_fga_audit_trail视图,这个视图的定义请参看Oracle Reference:
我们来简单介绍一下audit trail中几个我们关注的列:
sessionid,对应 v$session.audsid ;SCN 为查询时的scn号;sql_text 为应用程序执行的真实sql(可能使用了绑定变量);sql_bind(绑定变量信息);ENTRYID——这个不太好解释,简单来说(不是严格正确),sessionid + entryid 可以唯一定位一条audit trail记录。
但是,如果我们还希望存储其他的信息,显然fga_log$或者dba_fga_audit_trail不能提供,所以我们需要使用dbms_fga.add_policy的事件处理机制(使用handler_module,FGA记录audit trail的同时,调用此handler_module)。在handler_module指定的存储过程中,我们将fga_log$中的信息,再加上我们想存储的一些其他信息,存储在我们自己设计的一张表中。
/*
Audit Record Table
*/
CREATE TABLE sec_ftf_rcd
(
fgasid NUMBER(20),
entryid NUMBER(20),
audit_date DATE,
fga_policy VARCHAR2(30),
db_user VARCHAR(30),
os_user VARCHAR2(30),
authent_type VARCHAR2(30),
client_id VARCHAR2(100),
client_info VARCHAR2(64),
host_name VARCHAR2(54),
instance_id NUMBER(2),
ip VARCHAR2(30),
term VARCHAR2(30),
schema_owner VARCHAR2(20),
table_name VARCHAR2(30),
sql_text VARCHAR2(64),
SCN NUMBER(10),
appusercode varchar2(100),
appusername varchar2(100),
appmodule varchar2(255),
appmoduleparam varchar2(255)
)
TABLESPACE sysaux
PARTITION BY RANGE (audit_date)
(
PARTITION y14m01 VALUES LESS THAN
(TO_DATE('02/01/2014','mm/dd/yyyy')),
PARTITION y14m02 VALUES LESS THAN
(TO_DATE('03/01/2014','mm/dd/yyyy')),
PARTITION y14m03 VALUES LESS THAN
(TO_DATE('04/01/2014','mm/dd/yyyy')),
PARTITION y14m14 VALUES LESS THAN
(TO_DATE('05/01/2014','mm/dd/yyyy')),
PARTITION y14m05 VALUES LESS THAN
(TO_DATE('06/01/2014','mm/dd/yyyy')),
PARTITION y14m06 VALUES LESS THAN
(TO_DATE('07/01/2014','mm/dd/yyyy')),
PARTITION y14m07 VALUES LESS THAN
(TO_DATE('08/01/2014','mm/dd/yyyy')),
PARTITION y14m08 VALUES LESS THAN
(TO_DATE('09/01/2014','mm/dd/yyyy')),
PARTITION y14m09 VALUES LESS THAN
(TO_DATE('10/01/2014','mm/dd/yyyy')),
PARTITION y14m10 VALUES LESS THAN
(TO_DATE('11/01/2014','mm/dd/yyyy')),
PARTITION y14m11 VALUES LESS THAN
(TO_DATE('12/01/2014','mm/dd/yyyy')),
PARTITION def VALUES LESS THAN
(MAXVALUE)
);

这是我们自定义准备存储审计记录的表(其实应该再增加备用几个字段,将来肯定要扩展)。

/*
Key:
appusercode: Userid In App
appusername: Username In App
appModule: app module/window
appModuleParam: app module/window parameter

*/
create or replace
package body set_fga_ctx_pkg is

// 操作application context的存储过程
procedure store_sess_params(
param_key in varchar2,
param_value in varchar2,
param_comment in varchar2
)
is
begin
dbms_session.set_context('SEC_FTF_CTX',param_key,param_value);
end;

// FGA 的handler module
procedure handler_audit(
p_table_owner IN VARCHAR2,
p_table_name IN VARCHAR2,
p_fga_policy IN VARCHAR2
)
is
PRAGMA AUTONOMOUS_TRANSACTION;
l_fgasid NUMBER(20);
l_entryid NUMBER(20);
l_term VARCHAR2(2000);
l_db_user VARCHAR2(30);
l_os_user VARCHAR2(30);
l_authent_type VARCHAR2(2000);
l_client_id VARCHAR2(100);
l_client_info VARCHAR2(64);
l_host_name VARCHAR2(30);
l_instance_id NUMBER(2);
l_ip VARCHAR2(30);
l_sql_text VARCHAR2(4000);
l_scn NUMBER;
l_appusercode number;
l_appusername varchar2(100);
l_appModule varchar2(255);
l_appModuleParam varchar2(255);
begin
l_fgasid := sys_context('USERENV','SESSIONID');
l_entryid := sys_context('USERENV','ENTRYID');
l_term := sys_context('USERENV','TERMINAL');
l_db_user := sys_context('USERENV','SESSION_USER');
l_os_user := sys_context('USERENV','OS_USER');
l_authent_type := sys_context('USERENV','AUTHENTICATION_TYPE');
l_client_id := sys_context('USERENV','CLIENT_IDENTIFIER');
l_client_info := sys_context('USERENV','CLIENT_INFO');
l_host_name := sys_context('USERENV','HOST');
l_instance_id := sys_context('USERENV','INSTANCE');
l_ip := sys_context('USERENV','IP_ADDRESS');
l_sql_text := sys_context('USERENV','CURRENT_SQL');
l_scn := SYS.DBMS_FLASHBACK.get_system_change_number;
l_appusercode := sys_context('SEC_FTF_CTX','appusercode');
l_appusername := sys_context('SEC_FTF_CTX','appusername');
l_appModule := sys_context('SEC_FTF_CTX','appModule');
l_appModuleParam := sys_context('SEC_FTF_CTX','appModuleParam');

insert into sec_ftf_rcd (
fgasid ,
entryid ,
audit_date ,
fga_policy ,
db_user ,
os_user ,
authent_type ,
client_id ,
client_info ,
host_name ,
instance_id ,
ip ,
term ,
schema_owner ,
table_name ,
sql_text ,
SCN ,
appusercode ,
appusername ,
appmodule ,
appmoduleparam
)
values (
l_fgasid,
l_entryid,
sysdate,
p_fga_policy,
l_db_user,
l_os_user,
l_authent_type,
l_client_id,
l_client_info,
l_host_name,
l_instance_id,
l_ip,
l_term,
p_table_owner,
p_table_name,
'Select lsqltext From sys.fga_log$ where entryid = '||l_entryid||' AND sessionid = '||l_fgasid,
l_scn,
l_appusercode,
l_appusername,
l_appModule,
l_appModuleParam
)
;
commit;
exception
when others
then
rollback;
end;
end set_fga_ctx_pkg;

// package
create or replace
package set_fga_ctx_pkg is
procedure store_sess_params(
param_key in varchar2,
param_value in varchar2,
param_comment in varchar2
)
;
procedure handler_audit(
p_table_owner IN VARCHAR2,
p_table_name IN VARCHAR2,
p_fga_policy IN VARCHAR2
);
end set_fga_ctx_pkg;

可以看到在package中,handler_audit是让Oracle FGA调用的——审计操作除了存储audit trail,再调用此存储过程(其三个参数是固定的);另外,我们希望能够获取诸如“应用程序用户”、“应用程序模块”等信息,这些信息需要应用程序提供方(HIS厂商)存储到application context中,我们这里提供了存储这些变量的procedure——store_sess_params。
为了使用application context,额外的设置:
create context SEC_FTF_CTX using set_fga_ctx_pkg; // 创建application context,并指定操作package。
阶段一完成。

阶段二: 进一步分析操作是否违规

阶段一完成后,我们记录了每一次“敏感数据”操作,记录操作的:时间,sql,绑定变量,ip,机器名称,操作时间,数据库用户名,业务系统中的用户名,业务系统程序模块……
对sql语句进行分析(语法Parse),我们可以获得sql的predicates(where条件),select 的列内容……
我们需要根据这些参数,定义“黑”、“白”规则,符合“黑”规则的即为“违规”操作,符合“白”规则则不是违规操作。
规则举例:
举例1. where条件中指定了病人信息 ( 病人ID = :PATIID)则操作不违规
举例2. IP地址不在指定列表,则操作违规
举例3. 使用特定 app user, 并且操作时间为非工作时间,则操作违规
……
白规则优先,黑规则按顺序检查……

SQL语句的解析,准备使用java语言实现,使用 www.sqlparser.com 的产品,或者开源产品。
规则定义应该是一个单独的B/S架构的程序,本阶段的分析操作如果效率很高,可以整合到阶段一的handler_module中,如果效率不足,则分布处理。

Leave Comment