欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

利用PG Notify与listen功能搭建简单消息队列指南

最编程 2024-07-23 12:38:41
...

Notify/listen简介

Postgresql提供了客户端之间通过服务器端进行消息通信的机制,这种机制就是通过listen和notify命令完成的。

相关命令:

  •  listen :监听消息通道;例:listen topic_a
  •  unlisten:取消先前的监听;例:unlisten topic_a
  •  notify:发送消息到消息通道中;例:notify topic_a,’hello word’
  •  pg_notify():与notify相同的功能; 例:select pg_notify(‘topic_a’,’hello world’)
  •  pg_listening_channels():查看当前session已注册了哪些消息监听。Select pg_listening_channels()

触发器实现消息通信

--建表

create table info(

id int,

info varchar,

zhuangtai int);

 

--创建触发器函数

CREATE OR REPLACE FUNCTION fun_info() RETURNS TRIGGER AS $$

BEGIN

IF (TG_OP = 'INSERT') THEN

perform  pg_notify('info',new.info);

return null;

elsif (TG_OP = 'UPDATE') then

perform pg_notify('info_a',new.info);

return null;

END IF;

END;

$$ LANGUAGE plpgsql;

 

--创建触发器

--drop trigger tri_insert_info on info;

CREATE TRIGGER tri_insert_info

after insert or update ON info

FOR EACH ROW EXECUTE FUNCTION fun_info();

 

 

JDBC实现消息通信

 1 package test;
 2 
 3 import java.sql.*;
 4 
 5 public class NotificationTest {
 6 
 7     public static void main(String args[]) throws Exception {
 8         Class.forName("org.postgresql.Driver");
 9         String url = "jdbc:postgresql://127.0.0.1:5432/postgres";
10 
11         // Create two distinct connections, one for the notifier
12         // and another for the listener to show the communication
13         // works across connections although this example would
14         // work fine with just one connection.
15         Connection lConn = DriverManager.getConnection(url,"postgres","postgres");
16         Connection nConn = DriverManager.getConnection(url,"postgres","postgres");
17 
18         // Create two threads, one to issue notifications and
19         // the other to receive them.
20         Listener listener = new Listener(lConn);
21         Notifier notifier = new Notifier(nConn);
22         listener.start();
23         notifier.start();
24     }
25 
26 }
27 
28 class Listener extends Thread {
29 
30     private Connection conn;
31     private org.postgresql.PGConnection pgconn;
32 
33     Listener(Connection conn) throws SQLException {
34         this.conn = conn;
35         this.pgconn = (org.postgresql.PGConnection)conn;
36         Statement stmt = conn.createStatement();
37         stmt.execute("LISTEN mymessage");
38         stmt.close();
39     }
40 
41     public void run() {
42         while (true) {
43             try {
44                 // issue a dummy query to contact the backend
45                 // and receive any pending notifications.
46                 Statement stmt = conn.createStatement();
47                 ResultSet rs = stmt.executeQuery("SELECT 1");
48                 rs.close();
49                 stmt.close();
50 
51                 org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
52                 if (notifications != null) {
53                     for (int i=0; i<notifications.length; i++) {
54                         System.out.println("Got notification: " + notifications[i].getName());
55                          System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
56                     }
57                 }
58 
59                 // wait a while before checking again for new
60                 // notifications
61                 Thread.sleep(500);
62             } catch (SQLException sqle) {
63                 sqle.printStackTrace();
64             } catch (InterruptedException ie) {
65                 ie.printStackTrace();
66             }
67         }
68     }
69 }
70 
71 class Notifier extends Thread {
72 
73     private Connection conn;
74 
75     public Notifier(Connection conn) {
76         this.conn = conn;
77     }
78 
79     public void run() {
80         while (true) {
81             try {
82                 Statement stmt = conn.createStatement();
83 //                stmt.execute("NOTIFY mymessage");
84                 stmt.execute("NOTIFY mymessage,'123'");
85                 System.out.println("111111 notification: ");
86                 stmt.close();
87                 Thread.sleep(2000);
88             } catch (SQLException sqle) {
89                 sqle.printStackTrace();
90             } catch (InterruptedException ie) {
91                 ie.printStackTrace();
92             }
93         }
94     }
95 }
View Code

 

更多JDBC链接

https://jdbc.postgresql.org/documentation/81/index.html