在当今的数字时代,各行业的专业人士都必须及时了解即将举行的活动、会议和研讨会。然而,在浩瀚的在线信息海洋中有效地找到符合自己兴趣的事件是一项重大挑战。

本博客介绍了应对这一挑战的创新解决方案:一个综合应用程序,旨在从 Facebook 抓取事件数据并使用 MyScale。虽然 MyScale 通常与 RAG 技术堆栈相关联或用作矢量数据库,但其功能超出了这些领域。我们将利用它进行数据分析,利用其矢量搜索功能来分析语义相似的事件,从而提供更好的结果和见解。

您可能会注意到,Grok AI 使用 Qdrant 矢量数据库作为搜索引擎从 X(以前称为 Twitter)数据中检索实时信息。您还可以通过将 MyScale 与 Apify 等其他平台集成,以这种方式使用 MyScale 评估矢量数据库的功能,通过开发简单的个性化应用程序来增强日常生活任务。

因此,在本博客中,我们将开发一个仅将城市名称作为输入并从 Facebook 抓取所有相关事件的应用程序。随后,我们将利用MyScale先进的SQL向量能力进行数据分析和语义搜索。

工具和技术

我们将使用多种工具(包括 Apify、MyScale 和 OpenAI)来开发这个有用的应用程序。

注意:不要忘记在上面的脚本中添加您的 Apify API 密钥。您可以在 Apify 控制台的集成页面上找到您的 API 令牌.

数据预处理

当我们收集原始数据时,它有多种格式。在此脚本中,我们将把事件日期转换为单一格式,以便更有效地完成数据过滤。

Python

 

 # 导入数据操作和日期解析所需的库
将 pandas 导入为 pd
将 numpy 导入为 np
从日期时间导入日期时间
从 dateutil 导入解析器

# 解析可能表示范围或单个日期的日期字符串的函数
def parse_dates(date_str):
    # 检查日期字符串是否包含破折号,表示范围
    如果 date_str 中有“-”:
        parts = date_str.split('-')
        # 如果字符串分成两部分,则它是有效范围
        如果 len(部分) == 2:
            尝试:
                # 解析开始和结束日期,将它们格式化为可读格式
                start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
                end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
                返回开始日期、结束日期
            除了值错误:
                # 如果出现解析错误,则不执行任何操作(将在下面处理)
                经过
    # 如果不是范围或者解析范围失败,请尝试解析为单个日期
    尝试:
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 对 start_date 格式化单个日期,对 end_date 格式化不同
        start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
        end_date = parsed_date.strftime('%a, %b %d') # 省略 end_date 的时间
        返回开始日期、结束日期
    除了值错误:
        # 如果解析失败则返回 NaN
        返回 np.nan, np.nan

# 从日期字符串中提取详细日期、时间和日期的函数
def extract_date_time_day(date_str):
    尝试:
        # 解析日期字符串,允许输入格式具有一定的灵活性
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 提取并格式化日期、时间和日期部分
        日期 = parsed_date.strftime('%Y-%m-%d')
        日 = parsed_date.strftime('%a')
        # 判断原始字符串是否包含时间成分
        time_component = parsed_date.strftime('%I:%M %p') 不在 date_str 中
        time = parsed_date.strftime('%H:%M:%S') 如果不是 time_component 否则 np.nan
    除了值错误:
        # 如果解析失败,则将日期、时间和日期设置为 NaN
        日期、时间、日 = np.nan、np.nan、np.nan
    
    返回日期、时间、日期

# 在数据框中应用 parse_dates 函数,为开始日期和结束日期创建新列
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)

# 删除 Start_Date 为 NaN 的行,表示解析不成功
dataframe = dataframe1.dropna(子集=['Start_Date'])

# 应用 extract_date_time_day 将开始日期和结束日期拆分为单独的日期、时间和日期列
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))

# 删除原来的“日期时间”列,因为不再需要它
dataframe=dataframe.drop(['日期时间'], axis=1)

# 将“Start_Date”和“End_Date”转换为日期时间格式,仅提取日期部分
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date

# 将“Start_Time”转换为日期时间格式,保留时间信息
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])

此代码段使用 pandas 以及 Python 的 datetimedateutil 包来格式化数据。

生成嵌入

为了深入理解和搜索事件,我们将使用 text-embedding-3-small 根据事件的描述生成嵌入。这些嵌入捕获每个事件的语义本质,帮助应用程序返回更好的结果。

Python

 

 # 导入 OpenAI 库以进行 API 访问。
从 openai 导入 OpenAI
# 使用 API 密钥初始化 OpenAI 客户端。
openai_client = OpenAI(api_key="your_openai_api_key_here")
# 获取文本嵌入的函数
def get_embedding(text, model="text-embedding-3-small"):
    返回 openai_client.embeddings.create(input=text, model=model).data
嵌入 = get_embedding(dataframe["描述"].tolist())
# 从嵌入对象中提取嵌入向量
向量 = [embedding.embedding 用于嵌入嵌入]
数组 = np.array(向量)
embeddings_series = pd.Series(列表(数组))
# 将嵌入添加为 DataFrame 中的新列。
dataframe['Description_Embeddings'] = embeddings_series

现在,我们将嵌入新的 DataFrame 插入到 MyScale 中。

与 MyScale 连接

正如一开始所讨论的,我们将使用 MyScale 作为矢量数据库来存储和管理数据。在这里,我们将连接到 MyScale 准备数据存储。

Python

 

 导入 clickhouse_connect
客户端 = clickhouse_connect.get_client(
    主机='此处主机名',
    端口=443,
    用户名='用户名_此处',
    密码='passwd_here'
)

此连接设置确保我们的应用程序可以与 MyScale 通信,并使用 SQL 的强大功能进行数据操作和分析。

注意:请参阅连接详细信息了解有关如何连接到 MyScale 集群的更多信息。

使用 MyScale 创建表和索引

我们现在根据 DataFrame 创建一个表。所有数据都将存储在该表中,包括嵌入。

Python

 

 client.command("""
    创建表default.Events (
    名称字符串,
    描述字符串,
    用户_Going Int64,
    Users_感兴趣的 Int64,
    Users_Responded Int64,
    城市弦,
    按字符串组织,
    街道地址字符串,
    开始日期日期32,
    结束日期可为空(Date32),
    Start_Time可为空(DateTime64),
    开始日字符串,
    End_Day 字符串,
    描述_嵌入数组(Float32),
    约束 check_data_length 检查长度(描述_嵌入)= 1536
    ) 引擎 = MergeTree()
    订购依据(姓名);
""")

上述 SQL 语句在集群上创建一个名为 Events 的表。 CONSTRAINT 确保所有向量嵌入的长度都相同 1536

在 MyScale 中存储数据并创建索引

在这一步中,我们将处理后的数据插入到 MyScale 中。这涉及批量插入数据以确保高效的存储和检索。

Python

 

 batch_size = 10 # 根据需要调整

num_batches = len(dataframe) // 批量大小

对于范围内的 i(num_batches):
    start_idx = i * 批量大小
    结束 idx = 开始 idx + 批量大小
    批处理数据=数据帧[start_idx:end_idx]
    # print(batch_data["Description_Embeddings"])
    client.insert("default.Events",batch_data.to_records(index=False).tolist(),column_names=batch_data.columns.tolist())
    print(f"已插入批次 {i+1}/{num_batches}。")

客户端.命令("""
ALTER TABLE 默认值.事件
    添加向量索引向量_索引描述_嵌入
    类型 MSTG
""")

使用pandas,上述代码有效地将我们准备好的数据集传输到MyScale数据库中。

使用 MyScale 进行数据分析

最后,我们使用 MyScale 的分析功能来执行分析并启用语义搜索。通过执行 SQL 查询,我们可以根据主题、位置和日期来分析事件。那么,让我们尝试编写一些查询。

简单 SQL 查询

我们首先尝试从表格中获取前 10 个结果

Python

 

 结果=client.query("""
        从默认值中选择名称、描述。事件限制 10
    “””)
对于 results.named_results() 中的行:
        打印(行[“名称”])
打印(行['描述'])

此查询将仅返回 events 表中的前 10 个结果。

按语义相关性发现事件

让我们尝试查找与参考活动类似的十大即将举办的活动,如下所示:“该国持续时间最长的节目之一 – 自 1974 年开始运营……现在我们的50周年!!!我们的斯克内克塔迪”。这是通过比较事件描述的语义嵌入来实现的,确保主题和情感匹配。

Python

 

embeddings=get_embedding([“全国运行时间最长的节目之一 - 自 1974 年开始运营......现在是我们的第 50 周年了!!!我们的 Schenectady”])
嵌入=嵌入[0].嵌入
结果 = client.query(f"""
        选择名称、描述、
        distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
    “””)
对于 results.named_results() 中的行:
        print("事件标题", row["名称"])
        print("事件描述", row['描述'])
        print("距离:", row["dist"])

按受欢迎程度划分的趋势事件

此查询按参加者和感兴趣的用户数量对前 10 个活动进行排名,突出显示从大城市节日到大型会议的热门活动。对于那些想要参加大型、充满活力的聚会的人来说,它是理想的选择。

Python

 

 结果 = client.query(f"""
        选择姓名、城市、用户_前往、用户_感兴趣、用户_已回复
        来自默认事件
        按用户_去 DESC、用户_感兴趣 DESC 排序
        限制 10
    “””)
对于 results.named_results() 中的行:
        print("事件名称", row["名称"])
        print("城市", 行["城市"])
        print("用户去", row["Users_Going"])
print("感兴趣的用户", row["Users_Interested"])

纽约当地热门活动

结合相关性和受欢迎程度,此查询可识别与特定活动相关的纽约市的类似活动,并按出席人数对它们进行排名,提供反映城市充满活力的文化并吸引当地人的精选活动列表兴趣。

Python

 

 embeddings=get_embedding([“全国运行时间最长的节目之一 - 自 1974 年开始运营......现在是我们的第 50 周年了!!!我们的 Schenectady”])
embeddi=嵌入[0].嵌入
结果 = client.query(f"""
        选择名称、城市、描述、Users_Going、距离(Description_Embeddings, {embeddi}) 作为 dist
        来自默认事件
        其中城市如“%New York%”且距离 < 1.5
        ORDER BY Users_Going DESC,dist
        限制 10
    “””)
对于 results.named_results() 中的行:
        print("事件名称", row["名称"])
        print("描述", 行["描述"])
print("Users Going ", row["Users_Going"])

领先的活动组织者

此查询根据与会者和感兴趣的用户总数对前 10 名活动组织者进行排名,突出显示那些擅长举办引人注目的活动并吸引大量受众的组织者。它为对顶级活动感兴趣的活动策划者和参与者提供见解。

Python

 

 # 哪个客户端吸引的用户数量最多
结果 = client.query(f"""
       SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
        来自默认事件
        GROUP BY 组织者
        按 T​​otal_Users DESC 排序
        限额 10
    “””)
对于 results.named_results() 中的行:
        print("事件名称", row["组织者"])
        print("Total_Users", row["Total_Users"])

实现RAG

之前,我们探索了 MyScale 的数据分析功能,重点介绍了其增强数据工作流程的功能。展望未来,我们将通过实施检索增强生成 (RAG) 向前迈出一步,这是一种将外部知识库与法学硕士相结合的创新框架。此步骤将帮助您更好地了解数据并找到更详细的见解。接下来,您将了解如何将 RAG 与 MyScale 结合使用,这将使数据处理变得更加有趣和高效。

Python

 

 从 langchain.chat_models 导入 ChatOpenAI
从 langchain.prompts 导入 ChatPromptTemplate

query="你能给我推荐一些与篮球相关的活动吗"
# 使用上面定义的 get_embedding 方法,它接受一个句子列表
嵌入=get_embedding([查询])
嵌入=嵌入[0].嵌入
结果 = client.query(f"""
        选择名称、城市、Users_Going、描述、距离(Description_Embeddings, {embeddings}) 作为 dist
        来自默认事件
        ORDER BY Users_Going DESC,dist
        限制 5
    “””)
PROMPT_TEMPLATE = """
您的目标是仅使用以下提供的信息来回答问题:
{语境}
---
您的任务是仔细分析所提供的上下文,并仅根据所提供的信息回答以下问题:
{问题}
”“”
# 合并排名靠前的结果的描述。
描述 = [结果中的行[“描述”].named_results()]
context_text = "\n\n---\n\n".join(描述)
Prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
提示=提示模板.格式(上下文=上下文文本,问题=查询)
模型 = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(提示)
formatted_response = f"响应:{response_text}\n"
打印(formatted_response)

通过本博客,我们发现 MyScale 不仅仅是一个可用于开发各种应用程序的矢量数据库。我们可以将其用作简单的 SQL 数据库,也可以用于高级 AI 应用程序,涵盖了大部分开发领域。我们鼓励您尝试一下,通过注册免费套餐并获得 500 万个免费矢量存储空间来探索高级功能。

结论

我们通过开发事件分析应用程序的过程,探索了 MyScale 与 Apify Scraper 的能力和功能。 MyScale展示了其在高性能矢量搜索方面的卓越能力,同时保留了SQL数据库的所有功能,这有助于开发人员使用熟悉的SQL语法进行语义搜索,速度和准确性大大提高。

MyScale 的功能不仅限于此应用程序:您可以采用它来使用 RAG 方法。

如果您有任何反馈或建议,请联系我们。

Comments are closed.