利用PG Notify与listen功能搭建简单消息队列指南
最编程
2024-07-23 12:38:41
...
Notify/listen简介
Postgresql提供了客户端之间通过服务器端进行消息通信的机制,这种机制就是通过listen和notify命令完成的。
相关命令:
- listen :监听消息通道;例:listen topic_a
- unlisten:取消先前的监听;例:unlisten topic_a
- notify:发送消息到消息通道中;例:notify topic_a,’hello word’
- pg_notify():与notify相同的功能; 例:select pg_notify(‘topic_a’,’hello world’)
- pg_listening_channels():查看当前session已注册了哪些消息监听。Select pg_listening_channels()
触发器实现消息通信
--建表 create table info( id int, info varchar, zhuangtai int);
--创建触发器函数 CREATE OR REPLACE FUNCTION fun_info() RETURNS TRIGGER AS $$ BEGIN IF (TG_OP = 'INSERT') THEN perform pg_notify('info',new.info); return null; elsif (TG_OP = 'UPDATE') then perform pg_notify('info_a',new.info); return null; END IF; END; $$ LANGUAGE plpgsql;
--创建触发器 --drop trigger tri_insert_info on info; CREATE TRIGGER tri_insert_info after insert or update ON info FOR EACH ROW EXECUTE FUNCTION fun_info(); |
JDBC实现消息通信
1 package test; 2 3 import java.sql.*; 4 5 public class NotificationTest { 6 7 public static void main(String args[]) throws Exception { 8 Class.forName("org.postgresql.Driver"); 9 String url = "jdbc:postgresql://127.0.0.1:5432/postgres"; 10 11 // Create two distinct connections, one for the notifier 12 // and another for the listener to show the communication 13 // works across connections although this example would 14 // work fine with just one connection. 15 Connection lConn = DriverManager.getConnection(url,"postgres","postgres"); 16 Connection nConn = DriverManager.getConnection(url,"postgres","postgres"); 17 18 // Create two threads, one to issue notifications and 19 // the other to receive them. 20 Listener listener = new Listener(lConn); 21 Notifier notifier = new Notifier(nConn); 22 listener.start(); 23 notifier.start(); 24 } 25 26 } 27 28 class Listener extends Thread { 29 30 private Connection conn; 31 private org.postgresql.PGConnection pgconn; 32 33 Listener(Connection conn) throws SQLException { 34 this.conn = conn; 35 this.pgconn = (org.postgresql.PGConnection)conn; 36 Statement stmt = conn.createStatement(); 37 stmt.execute("LISTEN mymessage"); 38 stmt.close(); 39 } 40 41 public void run() { 42 while (true) { 43 try { 44 // issue a dummy query to contact the backend 45 // and receive any pending notifications. 46 Statement stmt = conn.createStatement(); 47 ResultSet rs = stmt.executeQuery("SELECT 1"); 48 rs.close(); 49 stmt.close(); 50 51 org.postgresql.PGNotification notifications[] = pgconn.getNotifications(); 52 if (notifications != null) { 53 for (int i=0; i<notifications.length; i++) { 54 System.out.println("Got notification: " + notifications[i].getName()); 55 System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter()); 56 } 57 } 58 59 // wait a while before checking again for new 60 // notifications 61 Thread.sleep(500); 62 } catch (SQLException sqle) { 63 sqle.printStackTrace(); 64 } catch (InterruptedException ie) { 65 ie.printStackTrace(); 66 } 67 } 68 } 69 } 70 71 class Notifier extends Thread { 72 73 private Connection conn; 74 75 public Notifier(Connection conn) { 76 this.conn = conn; 77 } 78 79 public void run() { 80 while (true) { 81 try { 82 Statement stmt = conn.createStatement(); 83 // stmt.execute("NOTIFY mymessage"); 84 stmt.execute("NOTIFY mymessage,'123'"); 85 System.out.println("111111 notification: "); 86 stmt.close(); 87 Thread.sleep(2000); 88 } catch (SQLException sqle) { 89 sqle.printStackTrace(); 90 } catch (InterruptedException ie) { 91 ie.printStackTrace(); 92 } 93 } 94 } 95 }
更多JDBC链接
https://jdbc.postgresql.org/documentation/81/index.html