small-child-at-foot-of-large-stairs

我们用于数据分析的技术最近有了很大的发展。良好的旧关系数据库系统每天都变得不那么流行。现在,我们必须通过几种新技术找到我们的方法,这些技术可以处理大型(和流式处理)数据,最好是在分布式环境中。

Python现在风靡一时,但当然也有很多选择。SQL总是闪耀,而其他一些老的,但黄金,我们永远不能低估,仍然在那里。

因此,确实有广泛的替代方案。让我们浏览一下其中一些,好吗?

我将在这篇文章中定义一个简单的挑战,并提供十种用十种不同的技术编写的解决方案:

Awk

Mapreduce

Perl

Bash

蜂巢

Sql

Scala

Python Mongodb

它们共同代表了过去30多年!

使用这些技术,我们将使用Grouplens 网站提供的两个 CSV 数据集列出 10 部最受欢迎的电影。

您可能还喜欢:如何:在分布式 SQL 数据库上运行的 PGexercises 后greSQL 教程。

数据集

我们将使用 MovieLens 100K 数据集。实际上,只有以下两个文件从存档:

  • u.data是选项卡分隔文件,用于保存分级,并包含四列:

    • 用户\id (int), 电影\id (int), 评级 (int), 时间 (int)

  • u.item是一个管道 (*) 分隔文件。我们只需要从这里获取电影标题,但有几个列:

    • 电影Int, 电影标题varchar (200), 发布日期, 视频释放日期, imdbUrl varchar (300), 标志根斯未知int, 旗根斯行动int, 旗根斯冒险int, 旗子的动画int, 旗子儿童 int,旗体犯罪int, 旗文事件场, 旗文天场, 旗文天Int, 旗文学Int, 旗子,音乐,文体,标志,文体,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国家,国旗,国旗,国家,国旗,国旗,国旗,国旗,国家,国旗,国旗,国家,国旗,国家,国旗,国旗,国家,国旗,国家,国旗,国旗,国旗,国旗,国家,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,战国,国旗,国旗,国旗,国旗,国旗,国旗,无面,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,国旗,旗 根代战争 int, 旗 根西国际

我们的目标

我们将聚合分级数据(u.data)以计算每个电影_id的平均评分,并查找平均评分最高的 10 部电影。

我们将忽略那些收视率不到一百的电影。否则,我们会找到很多 5 星级电影,这些电影仅由一两个用户进行评级。因此,我们将过滤掉它们。

然后,我们将使用与 的联接来获取影片标题。因此,我们将使用 联接来获取影片数据(u.item)。结果将包含影片_id、影片 Title 和平均评级,如下所示。

1

408

关闭扫描,A (1995)

4.4910714285714286

2

318

辛德勒名单 (1993)

4.4664429530201342

3

169

裤子错误, (1993)

4.4661016949152542

4

483

卡萨布兰卡 (1942)

4.4567901234567901

5

64

肖尚克救赎, (1994)

4

3875598086124402

7

12

普通嫌疑犯,(1995年)

4.3857677902621723

8

50

星球大战 (1977)

4.3584905660377358

9

178

12 愤怒的男人 (1957)

4.3440000000000000

10

134

公民凯恩 (1941)

4.2929292929292929

现在,我们准备出发了。

1. AWK

Flightless boi

AWK 几乎和我一样古老,但它仍然是 @nix 下文本处理最强大的工具。你不一定认为它替代你最喜欢的编程语言,但它绝对值得尝试,特别是当你处理巨大的文本处理任务。

很多人将 AWK 与其他技术结合使用,以利用其在文本处理方面的巨大功能。

以下是针对我们挑战的 AWK 解决方案。这是一个单行—没有上传,没有临时文件,我们甚至不需要脚本文件!

join -1 2 -2 1 <(sort -n -k2 u.data) <(sort -n -k1 u.item | tr '|' '\t' | tr ' ' '~') | sort -n -k1 | cut -d' ' -f1,2,3,4,5 | tr ' ' '|' | awk 'BEGIN{FS="|";lastMovie=0;totalRating=0;countRating=0;lastMovieTitle=""} {if(lastMovie==$1) {totalRating=totalRating+$3;countRating=countRating+1} else {if(lastMovie!=0) {print(lastMovie " " lastMovieTitle " " totalRating " " countRating " " (totalRating/countRating))};lastMovie=$1;lastMovieTitle=$5;countRating=1;totalRating=$3}} END{print(lastMovie " " lastMovieTitle " " totalRating " " countRating " " (totalRating/countRating))}' | awk '{if($4>=100) print}' | sort -r -k5 | head -10 | tr ' ' '\t' | tr '~' ' '

对于那些不熟悉Linux的人来说,这看起来像是线路噪音。所以,让我们在这里给出一些解释。

第 1 步:联接数据集

联接接收两个数据集,u.datau.item,并联接它们以生成单个数据集。它使用第一个数据集 (-1 2) 中的第二列将第二个数据集 (-2 1) 中的第一列作为联接条件进行匹配。

u.数据联接操作之前在第二列 (-k2) 上排序。u.item是管道分隔的,但我们最好在联接之前将其更改为选项卡分隔格式。我这样做与tr。

我使用第二个tr将空格(在电影标题中)替换为波浪(+)字符。这是因为默认情况下,join命令输出是空间分隔的,并且在自定义分隔符时它很错误。这就是为什么我们摆脱了这里的空间。稍后我们将将它们更改回空格。

第 2 步:排序、剪切和 TR

联接的数据集按影片 ID 排序。这是第一列上的数字排序。(-n -k1)。然后,我使用剪切来获取前五列。我不需要u.item文件中的所有这些列。最后,tr将空间分隔文件转换为管道分隔文件。

Output after sorting, cutting, and tr

分拣、切割和 tr 后输出

第 3 步:AWK

AWK循环访问联接的数据集,该数据集按电影分类

此第一个awk命令的输出每个影片有一行,平均分级。

第 4 步:再次 AWK

第二个 awk用于筛选分级小于 100 的影片。

第 5 步:排序、头和 TR

然后,我们按电影的收视率对电影进行排序,并使用来获取前 10 部电影。最后,我们使用tr将输出转换为选项卡分隔格式,并将波浪线返回为空格。

2. PERL

Hump day!

为什么这么多人讨厌佩尔,我无法理解。这是一种可爱的编程语言,您不需要加密代码。这是因为,正如基思·博斯蒂奇已经说过的,Perl是”在RSA加密之前和之后看起来相同的唯一语言”。

最近,佩尔的人气下降引发了关于Perl逐渐消失的讨论。毫无疑问,它远不如上世纪90年代那么受欢迎。

但还是.比巴什快多了…它预安装在大多数 linux 发行版中…此外,Perl 从一开始就专注于报表处理。为什么不呢?让我们看看我们如何使用Perl处理这个排名前十的电影报告。

#!/usr/bin/perl use strict;
use warnings;

open (fle_ratings, '<', 'u.data') or die "Could not open u.data: $!";

my %hash1 = ();
while ()
{
    chomp $_;
    my ($user_id, $movie_id, $rating) = split(/\t/, $_);
    if(exists($hash1{$movie_id})){
       $hash1{$movie_id}[0]+=$rating;
       $hash1{$movie_id}[1]+=1;
    } else {
       $hash1{$movie_id}[0]=$rating;
       $hash1{$movie_id}[1]=1;
    }
    #print "$hash1{$movie_id}[0] *** $hash1{$movie_id}[1] \n"
   }
my %hash2 = ();
foreach my $key (keys %hash1)
{
    if ($hash1{$key}[1] >= 100) {
       $hash2{$key}=$hash1{$key}[0] / $hash1{$key}[1];
    }
}
close fle_ratings;

my $counter=0;
foreach my $movid (sort { $hash2{$b} <=> $hash2{$a} or $a cmp $b } keys %hash2) {
    my $movie='';
    open(fle_movies, '<', 'u.item') or die "Could not open u.item: $!";
    while ()
    {
       chomp $_;
       my ($movie_id, $movie_title) = split(/\|/, $_);
       if($movid==$movie_id){
          $movie=$movie_title;
          last;
       }
    }
    print "$movid $movie $hash2{$movid}\n";
    last if ++$counter == 10;
}

好吧,这是一个Perl脚本,我看不出有什么好的理由来讨厌它。也许这个奇怪的参数排序,但我可以忍受它。

我会把它放在一个文本文件中,使它可执行,并直接执行它。您可能认为所有这些循环都是性能杀手,但事实并非如此:Perl 会及时返回结果。

Ouput from Perl script

来自Perl脚本的欧普特

在 AWK 中的单行线之后,此脚本看起来过大,不是吗?

让我们深入了解一下此代码。

而循环

我们循环访问评级数据集,并填充名为哈希 1哈希值。哈希 1 将保留评分的总和和计数。

第一个福奇循环

现在,我们处理哈希1的每个成员,用平均值填充名为哈希2的新哈希值。

第二个福奇循环

我们处理hash2的每个成员,但在对值应用降序排序后因此,这些都是前10名评级的电影。

对于其中每部电影,我们搜索电影数据集中的电影标题。这是我们的 foreach循环内的一个一段时间循环。一找到我们的电影,我们就打破循环。

3. BASH

Image title

最流行的Linux外壳不需要任何介绍。我将直接跳转到 BASH脚本解决方案。

fle="u.data"
declare -a ratings
for movid in $(cut -f2 $fle | sort | uniq)
do
    countLines=$(grep "^[0-9]*\s$movid\s" $fle | cut -f3 | wc -l)
    sumRatings=$(grep "^[0-9]*\s$movid\s" $fle | cut -f3 | paste -sd+ | bc)
    avgRating=$(eval "echo 'scale=6; $sumRatings/$countLines' | bc")
    if [ $countLines -gt 100 ]
    then
        ratings[$movid]=$avgRating
    fi
done
for k in "${!ratings[@]}"
do
  echo $k'|'${ratings["$k"]}'|'$(grep ""^$k\|"" u.item | cut -d"|" -f2)
done | sort -r -t'|' -k2 | head -10

这一次,这是一个不同的方法。

cut -f2 $fle | sort | uniq 

它会给我分类不同的电影ID列表。我循环遍历每个电影 ID 并计算行数,即为该影片给出的评分计数。

正则表达式 ^[0-9]*\s$movid\s 为我提供了包含第二列中特定影片 ID 的 ^ [0-9]* \s 行。

我还计算这里的评级总和。剪切 -f3grep将返回特定影片的所有分级值。粘贴将帮助产生一个文本,结合这些评级值,与分隔符”+”,bc将计算此求和的结果。

然后,我将循环浏览我的分级数组,找到每个影片的标题,并打印 10 个得分最高的值。

Final output in Bash

Bash 中的最终输出

尽管它看起来像一个更简单的解决方案,但它最多需要 30 秒才能完成。丑陋的佩尔轻松胜过巴什!

4. SQL(后SQL)

Image title

对于我们大多数人来说,最简单的方法是将数据加载到我们最喜欢的 RDBMS 中,并编写 SQL 查询来生成结果。我将使用 PostgreSQL。

首先,我将更改您.item 文件的编码(如果您遇到电影标题的编码问题,您可能需要此):

iconv -f ISO-8859-1 -t UTF-8 u.item > movie_def.txt

然后,让我们创建表并将数据加载到表中:

postgres=# \c olric

You are now connected to database "olric" as user "postgres".

olric=# create table ratings (userId int, movieId int, rating int, timestamp int);

CREATE TABLE
olric=# create table movies (movieId int, movieTitle varchar(200), releaseDate date, videoReleaseDate date, imdbUrl varchar(300), flagGenreUnknown int, flagGenreAction int, flagGenreAdventure int, flagGenreAnimation int, flagGenreChildrens int, flagGenreComedy int, flagGenreCrime int, flagGenreDocumentary int, flagGenreDrama int, flagGenreFantasy int, flagGenreFilmNoir int, flagGenreHorror int, flagGenreMusical int,  flagGenreMystery int, flagGenreRomance int, flagGenreSciFi int, flagGenreThriller int, flagGenreWar int, flagGenreWestern int);

CREATE TABLE
olric=# COPY ratings FROM '/home/oguz/10_Solutions/u

txt’(格式 csv,分隔符’+,强制\空(视频释放日期));
副本 1682

这里是 SQL 提供结果:

olric=# WITH avgRatings AS (SELECT movieId, AVG(rating) AS avgRating FROM ratings GROUP BY movieId HAVING COUNT(*) >= 100) SELECT m.movieId, m.movieTitle, a.avgRating FROM movies m JOIN avgRatings a ON m.movieId=a.movieId ORDER BY a.avgRating DESC LIMIT 10;

     408 | Close Shave, A (1995)            | 4.4910714285714286

     318 | Schindler's List (1993)          | 4.4664429530201342

     169 | Wrong Trousers, The (1993)       | 4.4661016949152542

     483 | Casablanca (1942)                | 4.4567901234567901

      64 | Shawshank Redemption, The (1994) | 4.4452296819787986

     603 | Rear Window (1954)               | 4.3875598086124402

      12 | Usual Suspects, The (1995)       | 4.3857677902621723

      50 | Star Wars (1977)                 | 4.3584905660377358

     178 | 12 Angry Men (1957)              | 4.3440000000000000

     134 | Citizen Kane (1941)              | 4.2929292929292929

5. 与熊猫的Python

Image title

Python 已经非常流行,成为数据科学的选择。如果它跟上步伐,Python可能会在几年内成为世界上最流行的编程语言。目前Python排名第三,仅次于Java和C。

以下 Python 解决方案使用熊猫库,这使得数据分析任务变得如此简单。

import pandas as pd
ratings = pd.read_csv('u.data', delimiter='\t', names = ['userId', 'movieId', 'rating', 'ratingTime'])
movies = pd.read_csv('u.item', delimiter='|', usecols=[0,1], names = ['movieId', 'movieTitle'])
joined=pd.merge(ratings, movies, how='inner', on='movieId')
averages=joined.groupby(['movieId','movieTitle']).agg({'rating':'mean', 'userId':'count'})
averages.columns=['avgRating', 'countRating']
print(averages[averages.countRating>=100].sort_values(by=['avgRating'], ascending=False).head(10))

因此,这比 SQL 查询更可读的代码,不是吗?

Output with Postgres

带后置的输出

6. 使用 Python 中的 MR 作业进行映射

Image title

您可能最好使用不太复杂的工具,如 Pig、Hive 或 Spark,但 MapReduce 是 Apache Hadoop 下处理数据的典型方式。

让我们来看看我们如何使用 MapReduce 应对我们的挑战。为此,我将再次使用 Python,但这次是使用 MRJob 库

from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
class RatingsBreakdown(MRJob):

    def movie_title(self, movid):
    with open("/home/oguz/10_Solutions/u.item", "r") as infile:
        reader = csv.reader(infile, delimiter='|')
        next(reader)
        for line in reader:
            if int(movid) == int(line[0]):
                return line[1]
    def steps(self):
        return [
            MRStep(mapper=self.mapper1, reducer=self.reducer1),
        MRStep(mapper=self.mapper2, reducer=self.reducer2)
        ]

    def mapper1(self, _, line):
        (userID, movieID, rating, timestamp) = line

电影\标题(int(键))

如果 [名称] = “{main}”:
评级细分.run()

我想在这里我们需要一些解释。

steps(self)给我们的地图减少工作。在我们的案例中定义了两个步骤。

每个步骤可以由映射器、组合器和减速器组成。虽然它们都是可选的,但步骤将至少由其中一个步骤组成。我们的两个步骤都由一个映射器和一个减速器组成。

我们的第一步映射器 (mapper1) 使用tab作为分隔符拆分 u.data 文件的所有行.我们现在有所有四个列在手,但我们只对电影 ID 和分级感兴趣,因此映射器返回这两个值。

Output from mapper

映射器的输出

映射器不聚合数据。因此,如果有n行进入,映射器输出也是n行。

第一步的减速器 (减速器1) 用于计算每个电影 ID 的平均额定值.减速器接收影片 ID 作为键,并将分级作为

默认情况下,聚合是值。我们只需要计算聚合值并使用收益率返回它。

所有映射器和缩减器返回键和值对。reducer1 的返回值将影片 ID 作为键,将平均分级作为值。

现在数据是聚合的,reducer1 的输出每个影片 ID 有一个(并且只有一个)行。

我们的第二步(mapper2)的映射器将电影ID移出键。变为空值 (),现在是电影 ID 和平均分级的列表。

这是因为我们想要找到收视率最高的电影。下一个减速器应扫描整个数据集并查找最高评分的影片。为了确保扫描所有数据,我们必须清空密钥 – 否则,减少器将单独对所有键进行操作。

减器1对值上的数据进行排序。值是一个列表,其第一个成员是平均评级,因此我们的反向排序循环将从最高评分的影片开始,并止于第十行。

7. 猪拉丁文

Image title

猪拉丁让我们有机会使用比MapReduce本身更简单的表示法。因此,它是一个高级工具,可以在 MapReduce(或 Tez 或 Spark)中执行作业

猪拉丁解决方案,我们的挑战是在这里:

ratings = LOAD '/user/hdpuser/movies/u.data' AS (userid:int, movieid:int, rating:int, time:int);

grp_ratings = GROUP ratings BY movieid;

avg_rat = FOREACH grp_ratings GENERATE group as movid, AVG(ratings.rating) as avgRating , COUNT(ratings.movieid) as cnt_rat;

avg_ratings = FILTER avg_rat BY cnt_rat >= 100;

movies = LOAD '/user/hdpuser/movies/u.item' USING PigStorage('|') AS (movieid:int, moviename:chararray);

joined = JOIN avg_ratings BY movid, movies BY movieid;

dataset = FOREACH joined GENERATE movies::moviename as movnam, avg_ratings::avgRating as avgRating;

ordered = ORDER dataset BY avgRating desc;

top10 = LIMIT ordered 10;

DUMP top10;

代码本身非常不言自明,因此我将跳过此处的解释。

8. 蜂巢

Image title

就像猪一样,Hive 提供了一个更简单的平台来处理 Apache Hadoop 上的数据因此,我们将在 Hive 控制台下创建表,并在 Hive 下物理存储数据。

create database olric;

CREATE EXTERNAL TABLE IF NOT EXISTS olric.ratings_temp
(userId INT, movieId INT, rating INT, ratingTime INT)

ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION 'hdfs://dikanka:8020/user/oguz/MovieData/ratings';

select * from olric.ratings_temp limit 10;

CREATE EXTERNAL TABLE IF NOT EXISTS olric.movies_temp
(movieId int, movieTitle varchar(200), releaseDate date, videoReleaseDate date, imdbUrl varchar(300), flagGenreUnknown int, flagGenreAction int, flagGenreAdventure int, flagGenreAnimation int, flagGenreChildrens int, flagGenreComedy int, flagGenreCrime int, flagGenreDocumentary int, flagGenreDrama int, flagGenreFantasy int, flagGenreFilmNoir int, flagGenreHorror int, flagGenreMusical int,  flagGenreMystery int, flagGenreRomance int, flagGenreSciFi int, flagGenreThriller int, flagGenreWar int, flagGenreWestern int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 'hdfs://dikanka:8020/user/oguz/MovieData/movies';

select movieId, movieTitle from movies_temp limit 10;

我在 Hive 控制台中创建了一个名为 olric 的数据库。我创建了两个外部表,指向我的 u.data 和 Hadoop 上的 you.item 文件。

我们仍然没有在 Hive 下物理存储数据。我们现在将这样做:

CREATE TABLE IF NOT EXISTS olric.ratings
(userId INT, movieId INT, rating INT)
STORED AS ORC;

INSERT OVERWRITE TABLE olric.ratings SELECT userId, movieId, rating FROM olric.ratings_temp;

select count(*) from olric.ratings;

CREATE TABLE IF NOT EXISTS olric.movies
(movieId int, movieTitle varchar(200))
STORED AS ORC;

INSERT OVERWRITE TABLE olric.movies SELECT movieId, movieTitle FROM olric.movies_temp;

select count(*) from olric.movies;

现在,我们已经有了 Hive 表,我们可以使用老旧的 SQL 技能来编写以下 HiveQL:

with rat as (select movieId, avg(rating) as avgRating, count(*) as cnt from olric.ratings GROUP BY movieId) select rat.movieId, mov.movieTitle, rat.avgRating from rat join olric.movies mov on rat.movieId=mov.movieId where cnt >= 100 order by avgRating desc limit 10;

INFO  : OK

+--------------+-----------------------------------+---------------------+

| rat.movieid  |          mov.movietitle           |    rat.avgrating    |

+--------------+-----------------------------------+---------------------+

| 408          | Close Shave, A (1995)             | 4.491071428571429   |

| 318          | Schindler's List (1993)           | 4.466442953020135   |

| 169          | Wrong Trousers, The (1993)        | 4.466101694915254   |

| 483          | Casablanca (1942)                 | 4.45679012345679    |

| 64           | Shawshank Redemption, The (1994)  | 4.445229681978798   |

| 603          | Rear Window (1954)                | 4.3875598086124405  |

| 12           | Usual Suspects, The (1995)        | 4.385767790262173   |

| 50           | Star Wars (1977)                  | 4.3584905660377355  |

| 178          | 12 Angry Men (1957)               | 4.344               |

| 134          | Citizen Kane (1941)               | 4.292929292929293   |

+--------------+-----------------------------------+---------------------+

9. 与斯卡拉的火花

Image title

根据Tiobe指数列表,Scala仍然不像Cobol:)那么受欢迎——但你可以很容易地看到,在Scala上,炒作仍在继续。

它是一种函数式编程语言,是在 JVM(Java 虚拟机)上运行的其他语言com/文章/介绍-阿帕奇-火花”rel=”nofollow”_Spark 本身,是用Scala写的。如果你想学习Spark,这是喜欢Scala而不是Python的流行原因。

Spark 引入了 RDD(弹性分布式数据集)。请参阅下面的解决方案,了解想法。

package com.olric.samplePackage01

import org.apache.spark._
import org.apache.spark.rdd.RDD


object top10Movies extends App {

  val sc = new SparkContext("local[*]", "WordCount")


  val moviesFile = sc.textFile("hdfs://dikanka:8020/user/oguz/MovieData/u.item")
  val movies: RDD[(Int, String)] = moviesFile.map {
    line =>
      val col = line.split('|')
      (col(0).toInt, col(1))
  }.sortByKey()


  val ratingsFile = sc.textFile("hdfs://dikanka:8020/user/oguz/MovieData/u.data")
  val ratings: RDD[(Int, Int)] = ratingsFile.map {
    line =>
      val col = line.split("\t")
      (col(1).toInt, col(2).toInt)
  }

  val ratingsPerMovieId = ratings.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)).filter(x => x._2._2 >= 100)
  val avgRatings=ratingsPerMovieId.map(x => (x._1, (x._2._1 / x._2._2.toFloat)))
  val joinedDataset=avgRatings.join(movies)
  joinedDataset.sortBy(_._2, false).take(10).foreach(println)
}

因此,我们从 Hadoop 读取电影文件,并填充 RDD 命名影片。我们对评级也做了同样的规定。电影 RDD 包含影片 ID 和影片标题,而分级 RDD 具有影片 ID 和分级。到目前为止,它很简单。

对于不熟悉的人,填充分级PerMovieId的行可能有点复杂。我们从评级 RDD 开始。此处的每一行都是两个值的列表:

(电影ID,评级)

表达式:

x > (x,1)

是编写函数的快捷方式。它实际上是一个函数,它以 x 作为参数,并将 (x, 1) 值列表作为返回值。X,在这里,表示从输入读取的行,额定 RDD。

因此,映射值的输出如下所示:

(电影ID,(评分,1))

然后,我们使用 reduceByKey,它需要知道如何减少具有相同键值的多行。X 和 y 表示具有相同键值的两行,我们提供以下函数来减少 ByKey,以便它知道如何减少这些行:

(x,y) > (x.#1 = y.#1, x.#2 = y.#2)

x._1 代表输入行 x 的第一个值,即额定值。同样,x._2 指向第二个值,该值始终是一个值。

因此,第一个值和第二个值在这里总结,以找到总评分和评分计数,每个电影 ID。

然后我们使用另一个函数,

x > x.±2.±2 >= 100

以筛选我们的数据集。

x.*2 是一个(国际,Int)列表,它保存了我们的评级总计和评级计数。

x._2._2 是评级计数的 Int 值。因此,此函数将摆脱低于 100 评级的移动。

代码的其余部分更容易。我们加入两个 RDD,根据评级对结果进行排序,取前 10 行并列出它们。

10. 蒙戈德

Image title

如果没有 NoSQL 数据库,此帖子可能不完整。因此,下面是 Mongodb,一个面向文档的 NoSQL 数据库,位于 2009 年。

MongoDB 将数据存储为 JSON 文档。因此,我现在将上传我的CSV文件作为文档的集合。首先,让我们使用 mongodb 命令行接口创建数据库。

> use olric

switched to db olric

use如果数据库不存在,该命令将创建该数据库

现在,让我们回到BASH,并使用mongoimport实用程序上传我们的CSV文件。

oguz@dikanka:~/moviedata$ cat /home/oguz/moviedata/u.data | mongoimport --db olric --collection "ratings" --drop --type tsv --fields userId,movieId,rating,ratingTime --host "127.0.0.1:27017"

2019-10-09T16:24:24.477+0200    connected to: 127.0.0.1:27017
2019-10-09T16:24:24.478+0200    dropping: olric.ratings
2019-10-09T16:24:25.294+0200    imported 100000 documents

oguz@dikanka:~/moviedata$ cut -d"|" -f 1,2 /home/oguz/moviedata/u.item | tr "|" "\t" | mongoimport --db olric --collection "movies" --drop --type tsv --fields movieId,movieTitle --host "127.0.0.1:27017"

2019-10-09T16:26:00.812+0200    connected to: 127.0.0.1:27017
2019-10-09T16:26:00.812+0200    dropping: olric.movies
2019-10-09T16:26:01.118+0200    imported 1682 documents

TSV 代表选项卡分隔的 CSV 文件。由于 u.item 是管道分隔的,因此我使用 tr 将其转换为选项卡分隔格式,然后剪切以仅提取前两列。

回到蒙戈德布控制台内,以控制上传。

> use olric

switched to db olric

> db.ratings.find()

{ "_id" : ObjectId("5d9ded98c233e200b842a850"), "userId" : 253, "movieId" : 465, "rating" : 5, "ratingTime" : 891628467 }
{ "_id" : ObjectId("5d9ded98c233e200b842a851"), "userId" : 305, "movieId" : 451, "rating" : 3, "ratingTime" : 886324817 }
...

> db.movies.find()

{ "_id" : ObjectId("5d9dedf8c233e200b8442f66"), "movieId" : 7, "movieTitle" : "Twelve Monkeys (1995)" }
{ "_id" : ObjectId("5d9dedf8c233e200b8442f67"), "movieId" : 8, "movieTitle" : "Babe (1995)" }
{ "_id" : ObjectId("5d9dedf8c233e200b8442f68"), "movieId" : 9, "movieTitle" : "Dead Man Walking (1995)" }

...

以下是我们挑战的单哥布解决方案:

> db.ratings.aggregate([{$group: {_id: "$movieId", avgRating: {$avg : "$rating"}, count: {$sum : 1} }}, {$match : {count : {$gte : 100}}}, {$sort : {avgRating : -1}}, {$limit : 10}, {$lookup : {from: "movies", localField: "_id", foreignField: "movieId", as: "movieDef"}}, {$unwind : "$movieDef"}]).forEach(function(output) {print(output._id + "\t" + output.avgRating + "\t" + output.movieDef.movieTitle) })

您可能会因为这里使用的所有括号而迷路。

我们在集合中使用名为评级的方法聚合。聚合是一种集合方法;它接受多个管道阶段,如$group、$match、$sort、$limit、$lookup和$unwind。

你可以看到这些是我用的。舞台$groupmovieID对文档集合进行分组,并将几个计算字段添加到这些文档中。这些被命名为avgRating计数。

阶段$match筛选出计数小于 100 的文档。

猜猜$sort和$limit是哪个阶段?好的,我跳过这些。

$lookup从查找数据中查找与字段影片 Id 匹配的另一个集合。它将整个匹配行引入名为movieDef 的数组。

$unwind删除此数组,查找字段中的每个字段将成为文档集合中的单独字段。

forEach遍历文档,现在只有 10 个,并按分级排序。我们使用函数(输出)来打印结果。

我知道,这是一篇很长的帖子,但我们介绍了十种不同的技术,以便从两个数据集中编写一个聚合报告。

我希望它有助于快速了解这些技术。

进一步阅读

  • 卡夫卡教程为大家,不管你在发展阶段。
Comments are closed.