创建数据/收集数据 + 从 PI 数据到 PC + 实时用户界面 + 到 PLC
Get_Data
----------
import csv
import os
import random
from datetime import datetime
import logging
import time
# 配置日志记录
logging.basicConfig(filename='D:/_Study/Case/Great_Data/log.txt',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def 记录日志(消息):
logging.info(消息)
print(消息)
def 生成随机记录():
# 生成随机数据
aX_U_1 = random.uniform(-0.29969177, 0.29969177)
aY_U_1 = random.uniform(0.6930143, 2.29969177)
aZ_U_1 = random.uniform(-0.6866665, 0.69969177)
timestamp = datetime.now().strftime("%Y/%m/%d %H:%M:%S:%f")[:-3] # 格式化时间戳
aX_D_1 = random.uniform(-0.38232422, 0.69969177)
aY_D_1 = random.uniform(0.5595703, 0.79969177)
aZ_D_1 = random.uniform(0.7294922, 2.7294922)
# 数据记录列表
record = [
aX_U_1,
aY_U_1,
aZ_U_1,
timestamp,
aX_D_1,
aY_D_1,
aZ_D_1
]
return record
def 追加到CSV(文件路径, 记录, 表头=None):
# 尝试写入CSV文件
try:
if not os.path.exists(文件路径):
# 如果文件不存在,则创建新文件并写入表头
with open(文件路径, mode='w', newline='', encoding='utf-8') as 文件:
writer = csv.writer(文件)
if 表头:
writer.writerow(表头)
writer.writerow(记录)
else:
# 否则追加记录
with open(文件路径, mode='a', newline='', encoding='utf-8') as 文件:
writer = csv.writer(文件)
writer.writerow(记录)
记录日志(f"记录已追加到 {文件路径}")
except PermissionError:
记录日志("权限拒绝:请确保目录可访问,并尝试以管理员身份运行脚本。")
except Exception as e:
记录日志(f"写入CSV时发生错误:{e}")
# 定义文件路径
基础目录 = 'D:/_Study/Case/Great_Data/Data'
os.makedirs(基础目录, exist_ok=True) # 创建目录(如果不存在)
# 文件名称模板
文件名模板 = 'AA@BB@{index}@{time}.csv'
文件索引 = 0
# 获取所有符合条件的文件
文件列表 = []
for 文件名 in os.listdir(基础目录):
if 文件名.startswith('AA@BB@'):
文件路径 = os.path.join(基础目录, 文件名)
文件列表.append(文件路径)
# 按照时间戳排序文件列表
文件列表.sort(key=lambda x: int(os.path.basename(x).split('@')[2].split('.')[0]), reverse=True)
# 只保留最新的两个文件
保留文件列表 = 文件列表[:1]
删除文件列表 = 文件列表[1:]
# 删除多余的文件,并记录到日志
for 文件路径 in 删除文件列表:
os.remove(文件路径)
记录日志(f"{文件路径} 已被删除。")
# 如果没有文件,则创建第一个文件
if not 保留文件列表:
文件索引 += 1
当前时间 = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
文件路径 = os.path.join(基础目录, 文件名模板.format(index=文件索引, time=当前时间))
表头 = [ 'aX_U_1', 'aY_U_1', 'aZ_U_1', '时间戳','aX_D_1', 'aY_D_1', 'aZ_D_1']
追加到CSV(文件路径, 表头, 表头=表头)
记录日志(f"{文件路径} 已创建并写入表头。")
保留文件列表.append(文件路径)
# 开始生成记录
记录总数 = 0
while 文件索引 <= 1:
for 文件路径 in 保留文件列表:
if 记录总数 >= 10:
文件索引 += 1
if 文件索引 > 1:
记录日志("所有文件均已达到最大行数。")
break
当前时间 = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
文件路径 = os.path.join(基础目录, 文件名模板.format(index=文件索引, time=当前时间))
表头 = [ 'aX_U_1', 'aY_U_1', 'aZ_U_1', '时间戳','aX_D_1', 'aY_D_1', 'aZ_D_1']
追加到CSV(文件路径, 表头, 表头=表头)
记录日志(f"{文件路径} 已创建并写入表头。")
保留文件列表.append(文件路径)
记录总数 = 0
记录 = 生成随机记录()
追加到CSV(文件路径, 记录)
记录总数 += 1
记录日志(f"当前 {文件路径} 总行数为:{记录总数}")
if 文件索引 > 2:
break
记录日志("处理完成。")
print("Task completed. Will reboot in 3 minutes.")
time.sleep(180) # 等待180秒
print("Rebooting now...")
os.system('sudo reboot')
'''
[Unit]
Description=Great Data Service
After=network.target
[Service]
User=pi
WorkingDirectory=/home/pi/Great_Data
ExecStart=/usr/bin/python3 /home/pi/Great_Data/Great_data.py
[Install]
WantedBy=multi-user.target
'''
----------
Copy+Del Data
----------
import pyautogui
import time
import os
import shutil
import datetime
#10 秒计时后开始运行
pyautogui.countdown(3)
#到达位置鼠标左键单击
pyautogui.click(1154,1038,button='left')#任务栏图标
time.sleep(0.1)
pyautogui.click(1158,910,button='left')#VNC程序
time.sleep(0.1)
pyautogui.click(659,65,button='left')#PI程序
time.sleep(0.1)##
pyautogui.click(1285,366,button='left')#MENU 菜单
time.sleep(0.1)
pyautogui.click(1170,486,button='left')#File Transfer
time.sleep(0.1)
pyautogui.click(743,677,button='left')#SendFile
time.sleep(0.1)
pyautogui.click(953,498,button='left')#File选择
time.sleep(0.1)
pyautogui.click(1039,615,button='left')#OK
time.sleep(0.1)
pyautogui.click(1039,616,button='left')#桌面路径
time.sleep(0.1)
pyautogui.click(1002,762,button='left')#确定
time.sleep(0.1)
pyautogui.click(1298,300,button='left')#X1
time.sleep(0.1)
pyautogui.click(1190,373,button='left')#X2
time.sleep(0.1)
pyautogui.click(1295,333,button='left')#X3
time.sleep(0.1)
pyautogui.click(1157,1037,button='left')##VNC程序
time.sleep(0.1)
pyautogui.click(1465,874,button='left')#Close File Transfere
time.sleep(0.1)
#到达位置鼠标左键双击
pyautogui.doubleClick(x=1298, y=301, button="left")
time.sleep(1.2)
#pyautogui.click(922,68,button='left')
#time.sleep(1)
#存储截图
im = pyautogui.screenshot()
im.save(r'D:\GGY\屏幕截图.png')
#####获取最新文件,并单独复制至处理文件夹
# 定义源目录和目标目录
src_dir = r'C:\Users\Administrator\Desktop\Great_Data\Data'
dst_dir = r'C:\Users\Administrator\Desktop\Great_Data\Data\Send'
# 如果目标目录不存在,则创建之
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
# 获取目录下所有的文件及其修改日期,并找到最新的那个
files = [os.path.join(src_dir, f) for f in os.listdir(src_dir) if os.path.isfile(os.path.join(src_dir, f))]
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
# 检查是否找到了文件
if files:
# 复制最新的文件
latest_file = files[0]
print(f'Copying the latest file: {latest_file}')
shutil.copy2(latest_file, dst_dir) # 使用shutil.copy2来保持元数据
# 删除其他的旧文件
for file in files[1:]:
print(f'Deleting old file: {file}')
try:
os.remove(file)
except OSError as e:
print(f"Error: {e.strerror} - {file}")
# 记录操作日志到目标目录下的 log.txt 文件
log_file_path = os.path.join(dst_dir, 'log.txt')
with open(log_file_path, 'a') as log_file:
log_file.write(f'{datetime.datetime.now()}: Latest file copied and old files deleted.\n')
else:
print('No files found in the directory.')
#####获取最新文件,并单独复制至处理文件夹
#填入参数, 第一参数是输入内容,第二个参数是每个字符间的间隔时间;
#pyautogui.moveTo(1241,311)
#count=(r'd:\aa')
#for c in str(count):
# pyautogui.keyDown(c)
#填入参数, 第一参数是输入内容,第二个参数是每个字符间的间隔时间;
#pyautogui.click(915,87,button='left')
#time.sleep(2)
#pyautogui.click(364,284,button='left')
#count=(2 )
#for cc in str(count):
# pyautogui.keyDown(cc)
#实时显示坐标
#pyautogui.displayMousePosition()
'''
import pyautogui
import time
#10 秒计时后开始运行
pyautogui.countdown(3)
#实时显示坐标d:\aa
while True:
x,y=pyautogui.position()
print('Pos:',(x,y))
'''
----------
From data to Chart
----------
##
import os
import pygame
import pandas as pd
import glob
import logging
from pygame.locals import *
# 配置日志记录
log_path = r'D:\Study\Case\Chart_RealTime\log.txt'
logging.basicConfig(filename=log_path, level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# 初始化pygame
pygame.init()
# 设置窗口大小和标题
WINDOW_WIDTH = 1920
WINDOW_HEIGHT = 1080
WINDOW_TITLE = "UI"
screen = pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))
pygame.display.set_caption(WINDOW_TITLE)
# 定义颜色
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GREEN = (0, 255, 0)
BLUE = (0, 0, 255)
ORANGE = (255, 165, 0)
PURPLE = (128, 0, 128)
BROWN = (139, 69, 19)
BLACK = (0, 0, 0)
# 源目录路径
source_dir = r'C:\Users\Administrator\Desktop\Great_Data\Data\Send'
# 查找目录下的所有CSV文件
csv_files = glob.glob(os.path.join(source_dir, '*.csv'))
if not csv_files:
print("未找到CSV文件,请检查目录路径是否正确。")
else:
# 用于绘制图表的数据
data = []
# 定义处理时间戳的函数
def process_timestamp(ts):
try:
if ':' in ts:
ts, microseconds = ts.rsplit(':', 1)
microseconds = microseconds.ljust(3, '0')
ts += '.' + microseconds
else:
ts += '.000'
except Exception as e:
logging.error(f"Failed to process timestamp {ts}: {e}")
ts = ts + '.000'
return ts
# 遍历每个CSV文件
for csv_file in csv_files:
logging.info("Reading CSV file...")
上一篇: Vue3 中的 30 个高频重点面试问题
下一篇: git 基础知识 -- 查找文件内容
推荐阅读
-
创建数据/收集数据 + 从 PI 数据到 PC + 实时用户界面 + 到 PLC
-
go语言Socket编程-Socket编程 什么是Socket Socket,英文含义是插座、插孔,一般称之为套接字,用于描述IP地址和端口。可以实现不同程序间的数据通信。 Socket起源于Unix,而Unix基本哲学之一就是“一切皆文件”,都可以用“打开open –> 读写write/read –> 关闭close”模式来操作。Socket就是该模式的一个实现,网络的Socket数据传输是一种特殊的I/O,Socket也是一种文件描述符。Socket也具有一个类似于打开文件的函数调用:Socket,该函数返回一个整型的Socket描述符,随后的连接建立、数据传输等操作都是通过该Socket实现的。 套接字的内核实现较为复杂,不宜在学习初期深入学习,了解到如下结构足矣。 套接字通讯原理示意 在TCP/IP协议中,“IP地址+TCP或UDP端口号”唯一标识网络通讯中的一个进程。“IP地址+端口号”就对应一个socket。欲建立连接的两个进程各自有一个socket来标识,那么这两个socket组成的socket pair就唯一标识一个连接。因此可以用Socket来描述网络连接的一对一关系。 常用的Socket类型有两种:流式Socket(SOCK_STREAM)和数据报式Socket(SOCK_DGRAM)。流式是一种面向连接的Socket,针对于面向连接的TCP服务应用;数据报式Socket是一种无连接的Socket,对应于无连接的UDP服务应用。 网络应用程序设计模式 C/S模式 传统的网络应用设计模式,客户机(client)/服务器(server)模式。需要在通讯两端各自部署客户机和服务器来完成数据通信。 B/S模式 浏览器(Browser)/服务器(Server)模式。只需在一端部署服务器,而另外一端使用每台PC都默认配置的浏览器即可完成数据的传输。 优缺点 对于C/S模式来说,其优点明显。客户端位于目标主机上可以保证性能,将数据缓存至客户端本地,从而提高数据传输效率。且,一般来说客户端和服务器程序由一个开发团队创作,所以他们之间所采用的协议相对灵活。可以在标准协议的基础上根据需求裁剪及定制。例如,腾讯所采用的通信协议,即为ftp协议的修改剪裁版。 因此,传统的网络应用程序及较大型的网络应用程序都首选C/S模式进行开发。如,知名的网络游戏魔兽世界。3D画面,数据量庞大,使用C/S模式可以提前在本地进行大量数据的缓存处理,从而提高观感。 C/S模式的缺点也较突出。由于客户端和服务器都需要有一个开发团队来完成开发。工作量将成倍提升,开发周期较长。另外,从用户角度出发,需要将客户端安插至用户主机上,对用户主机的安全性构成威胁。这也是很多用户不愿使用C/S模式应用程序的重要原因。 B/S模式相比C/S模式而言,由于它没有独立的客户端,使用标准浏览器作为客户端,其工作开发量较小。只需开发服务器端即可。另外由于其采用浏览器显示数据,因此移植性非常好,不受平台限制。如早期的偷菜游戏,在各个平台上都可以完美运行。 B/S模式的缺点也较明显。由于使用第三方浏览器,因此网络应用支持受限。另外,没有客户端放到对方主机上,缓存数据不尽如人意,从而传输数据量受到限制。应用的观感大打折扣。第三,必须与浏览器一样,采用标准http协议进行通信,协议选择不灵活。 因此在开发过程中,模式的选择由上述各自的特点决定。根据实际需求选择应用程序设计模式。 简单的C/S模型通信 Server端:Listen函数 func Listen(network, address string) (Listener, error) network:选用的协议:TCP、UDP, 如:“tcp”或 “udp” address:IP地址+端口号, 如:“127.0.0.1:8000”或 “:8000” Listener 接口: type Listener interface { Accept (Conn, error) Close error Addr Addr } Conn 接口: type Conn interface { Read(b byte) (n int, err error) Write(b byte) (n int, err error) Close error LocalAddr Addr RemoteAddr Addr SetDeadline(t time.Time) error SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error } 参看 [<u>https://studygolang.com/pkgdoc</u>](https://studygolang.com/pkgdoc) 中文帮助文档中的demo: 示例代码:TCP服务器.go package main import ( "net" "fmt" ) func main { // 创建监听 listener, err:= net.Listen("tcp", ":8000") if err != nil { fmt.Println("listen err:", err) return } defer listener.Close // 主协程结束时,关闭listener fmt.Println("服务器等待客户端建立连接...") // 等待客户端连接请求 conn, err := listener.Accept if err != nil { fmt.Println("accept err:", err) return } defer conn.Close // 使用结束,断开与客户端链接 fmt.Println("客户端与服务器连接建立成功...") // 接收客户端数据 buf := make(byte, 1024) // 创建1024大小的缓冲区,用于read n, err := conn.Read(buf) if err != nil { fmt.Println("read err:", err) return } fmt.Println("服务器读到:", string(buf[:n])) // 读多少,打印多少。 }